From 8ec9e80f2f8d8fa0405b34c0be64faa8414b3ee9 Mon Sep 17 00:00:00 2001 From: Thomas Schubart Date: Sun, 27 Feb 2022 20:37:42 +0000 Subject: [PATCH] Support cpulimit for cgroup v2 --- components/ws-daemon/pkg/cpulimit/cfs.go | 22 +-- components/ws-daemon/pkg/cpulimit/cfs_test.go | 8 +- components/ws-daemon/pkg/cpulimit/cfs_v2.go | 125 ++++++++++++++++++ components/ws-daemon/pkg/cpulimit/cpulimit.go | 9 ++ components/ws-daemon/pkg/cpulimit/dispatch.go | 50 ++++++- 5 files changed, 197 insertions(+), 17 deletions(-) create mode 100644 components/ws-daemon/pkg/cpulimit/cfs_v2.go diff --git a/components/ws-daemon/pkg/cpulimit/cfs.go b/components/ws-daemon/pkg/cpulimit/cfs.go index e1ffb7f71e44ad..e7daf3516b2760 100644 --- a/components/ws-daemon/pkg/cpulimit/cfs.go +++ b/components/ws-daemon/pkg/cpulimit/cfs.go @@ -16,11 +16,11 @@ import ( "golang.org/x/xerrors" ) -// CgroupCFSController controls a cgroup's CFS settings -type CgroupCFSController string +// CgroupV1CFSController controls a cgroup's CFS settings +type CgroupV1CFSController string // Usage returns the cpuacct.usage value of the cgroup -func (basePath CgroupCFSController) Usage() (usage CPUTime, err error) { +func (basePath CgroupV1CFSController) Usage() (usage CPUTime, err error) { cputime, err := basePath.readCpuUsage() if err != nil { return 0, xerrors.Errorf("cannot read cpuacct.usage: %w", err) @@ -30,7 +30,7 @@ func (basePath CgroupCFSController) Usage() (usage CPUTime, err error) { } // SetQuota sets a new CFS quota on the cgroup -func (basePath CgroupCFSController) SetLimit(limit Bandwidth) (changed bool, err error) { +func (basePath CgroupV1CFSController) SetLimit(limit Bandwidth) (changed bool, err error) { period, err := basePath.readCfsPeriod() if err != nil { err = xerrors.Errorf("cannot parse CFS period: %w", err) @@ -55,8 +55,8 @@ func (basePath CgroupCFSController) SetLimit(limit Bandwidth) (changed bool, err return true, nil } -func (basePath CgroupCFSController) readParentQuota() time.Duration { - parent := CgroupCFSController(filepath.Dir(string(basePath))) +func (basePath CgroupV1CFSController) readParentQuota() time.Duration { + parent := CgroupV1CFSController(filepath.Dir(string(basePath))) pq, err := parent.readCfsQuota() if err != nil { return time.Duration(0) @@ -65,7 +65,7 @@ func (basePath CgroupCFSController) readParentQuota() time.Duration { return time.Duration(pq) * time.Microsecond } -func (basePath CgroupCFSController) readString(path string) (string, error) { +func (basePath CgroupV1CFSController) readString(path string) (string, error) { fn := filepath.Join(string(basePath), path) fc, err := os.ReadFile(fn) if err != nil { @@ -76,7 +76,7 @@ func (basePath CgroupCFSController) readString(path string) (string, error) { return s, nil } -func (basePath CgroupCFSController) readCfsPeriod() (time.Duration, error) { +func (basePath CgroupV1CFSController) readCfsPeriod() (time.Duration, error) { s, err := basePath.readString("cpu.cfs_period_us") if err != nil { return 0, err @@ -89,7 +89,7 @@ func (basePath CgroupCFSController) readCfsPeriod() (time.Duration, error) { return time.Duration(uint64(p)) * time.Microsecond, nil } -func (basePath CgroupCFSController) readCfsQuota() (time.Duration, error) { +func (basePath CgroupV1CFSController) readCfsQuota() (time.Duration, error) { s, err := basePath.readString("cpu.cfs_quota_us") if err != nil { return 0, err @@ -106,7 +106,7 @@ func (basePath CgroupCFSController) readCfsQuota() (time.Duration, error) { return time.Duration(p) * time.Microsecond, nil } -func (basePath CgroupCFSController) readCpuUsage() (time.Duration, error) { +func (basePath CgroupV1CFSController) readCpuUsage() (time.Duration, error) { s, err := basePath.readString("cpuacct.usage") if err != nil { return 0, err @@ -120,7 +120,7 @@ func (basePath CgroupCFSController) readCpuUsage() (time.Duration, error) { } // NrThrottled returns the number of CFS periods the cgroup was throttled in -func (basePath CgroupCFSController) NrThrottled() (uint64, error) { +func (basePath CgroupV1CFSController) NrThrottled() (uint64, error) { f, err := os.Open(filepath.Join(string(basePath), "cpu.stat")) if err != nil { return 0, xerrors.Errorf("cannot read cpu.stat: %w", err) diff --git a/components/ws-daemon/pkg/cpulimit/cfs_test.go b/components/ws-daemon/pkg/cpulimit/cfs_test.go index b62e1e3da09c25..14bd36ddf9dfb0 100644 --- a/components/ws-daemon/pkg/cpulimit/cfs_test.go +++ b/components/ws-daemon/pkg/cpulimit/cfs_test.go @@ -69,7 +69,7 @@ func TestCfsSetLimit(t *testing.T) { t.Fatal(err) } - cfs := CgroupCFSController(tempdir) + cfs := CgroupV1CFSController(tempdir) changed, err := cfs.SetLimit(tc.bandWidth) if err != nil { t.Fatal(err) @@ -110,7 +110,7 @@ func TestReadCfsQuota(t *testing.T) { t.Fatal(err) } - cfs := CgroupCFSController(tempdir) + cfs := CgroupV1CFSController(tempdir) v, err := cfs.readCfsQuota() if err != nil { t.Fatal(err) @@ -132,7 +132,7 @@ func TestReadCfsPeriod(t *testing.T) { t.Fatal(err) } - cfs := CgroupCFSController(tempdir) + cfs := CgroupV1CFSController(tempdir) v, err := cfs.readCfsPeriod() if err != nil { t.Fatal(err) @@ -155,7 +155,7 @@ func TestReadCpuUsage(t *testing.T) { t.Fatal(err) } - cfs := CgroupCFSController(tempdir) + cfs := CgroupV1CFSController(tempdir) v, err := cfs.readCpuUsage() if err != nil { t.Fatal(err) diff --git a/components/ws-daemon/pkg/cpulimit/cfs_v2.go b/components/ws-daemon/pkg/cpulimit/cfs_v2.go new file mode 100644 index 00000000000000..81ed37f306a315 --- /dev/null +++ b/components/ws-daemon/pkg/cpulimit/cfs_v2.go @@ -0,0 +1,125 @@ +// Copyright (c) 2022 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License-AGPL.txt in the project root for license information. + +package cpulimit + +import ( + "bufio" + "math" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "golang.org/x/xerrors" +) + +type CgroupV2CFSController string + +func (basePath CgroupV2CFSController) Usage() (CPUTime, error) { + usage, err := basePath.getFlatKeyedValue("usage_usec") + if err != nil { + return 0, err + } + + return CPUTime(time.Duration(usage) * time.Microsecond), nil +} + +func (basePath CgroupV2CFSController) SetLimit(limit Bandwidth) (changed bool, err error) { + quota, period, err := basePath.readCpuMax() + if err != nil { + return false, xerrors.Errorf("failed to read cpu max from %s: %w", basePath, err) + } + + target := limit.Quota(period) + if quota == target { + return false, nil + } + + err = basePath.writeQuota(target) + if err != nil { + return false, xerrors.Errorf("cannot set CFS quota of %d (period is %d, parent quota is %d): %w", + target.Microseconds(), period.Microseconds(), basePath.readParentQuota().Microseconds(), err) + } + + return true, nil +} + +func (basePath CgroupV2CFSController) NrThrottled() (uint64, error) { + throttled, err := basePath.getFlatKeyedValue("nr_throttled") + if err != nil { + return 0, err + } + + return uint64(throttled), nil +} + +func (basePath CgroupV2CFSController) readCpuMax() (time.Duration, time.Duration, error) { + cpuMaxPath := filepath.Join(string(basePath), "cpu.max") + cpuMax, err := os.ReadFile(cpuMaxPath) + if err != nil { + return 0, 0, xerrors.Errorf("unable to read cpu.max: %w", err) + } + + parts := strings.Fields(string(cpuMax)) + if len(parts) < 2 { + return 0, 0, xerrors.Errorf("cpu.max did not have expected number of fields: %s", parts) + } + + var quota int64 + if parts[0] == "max" { + quota = math.MaxInt64 + } else { + quota, err = strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return 0, 0, xerrors.Errorf("could not parse quota of %s: %w", parts[0], err) + } + } + + period, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return 0, 0, xerrors.Errorf("could not parse period of %s: %w", parts[1], err) + } + + return time.Duration(quota) * time.Microsecond, time.Duration(period) * time.Microsecond, nil +} + +func (basePath CgroupV2CFSController) writeQuota(quota time.Duration) error { + cpuMaxPath := filepath.Join(string(basePath), "cpu.max") + return os.WriteFile(cpuMaxPath, []byte(strconv.FormatInt(quota.Microseconds(), 10)), 0644) +} + +func (basePath CgroupV2CFSController) readParentQuota() time.Duration { + parent := CgroupV2CFSController(filepath.Dir(string(basePath))) + quota, _, err := parent.readCpuMax() + if err != nil { + return time.Duration(0) + } + + return time.Duration(quota) +} + +func (basePath CgroupV2CFSController) getFlatKeyedValue(key string) (int64, error) { + stats, err := os.Open(filepath.Join(string(basePath), "cpu.stat")) + if err != nil { + return 0, xerrors.Errorf("cannot read cpu.stat: %w", err) + } + defer stats.Close() + + scanner := bufio.NewScanner(stats) + for scanner.Scan() { + entry := scanner.Text() + if !strings.HasPrefix(entry, key) { + continue + } + + r, err := strconv.ParseInt(strings.TrimSpace(strings.TrimPrefix(entry, key)), 10, 64) + if err != nil { + return 0, xerrors.Errorf("cannot parse cpu.stat: %s: %w", entry, err) + } + return int64(r), nil + } + return 0, xerrors.Errorf("cpu.stat did not contain nr_throttled") +} diff --git a/components/ws-daemon/pkg/cpulimit/cpulimit.go b/components/ws-daemon/pkg/cpulimit/cpulimit.go index 1cde5d9a98e713..8a39d73f675e78 100644 --- a/components/ws-daemon/pkg/cpulimit/cpulimit.go +++ b/components/ws-daemon/pkg/cpulimit/cpulimit.go @@ -294,3 +294,12 @@ func (bl *ClampingBucketLimiter) Limit(budgetSpent CPUTime) (newLimit Bandwidth) // empty bucket list return 0 } + +type CFSController interface { + // Usage returns the cpuacct.usage value of the cgroup + Usage() (usage CPUTime, err error) + // SetQuota sets a new CFS quota on the cgroup + SetLimit(limit Bandwidth) (changed bool, err error) + // NrThrottled returns the number of CFS periods the cgroup was throttled + NrThrottled() (uint64, error) +} diff --git a/components/ws-daemon/pkg/cpulimit/dispatch.go b/components/ws-daemon/pkg/cpulimit/dispatch.go index 4e33ebf8d675c8..297fd234d09591 100644 --- a/components/ws-daemon/pkg/cpulimit/dispatch.go +++ b/components/ws-daemon/pkg/cpulimit/dispatch.go @@ -6,10 +6,14 @@ package cpulimit import ( "context" + "os" "path/filepath" + "strings" "sync" "time" + "github.com/opencontainers/runc/libcontainer/cgroups/fs2" + "github.com/opencontainers/runc/libcontainer/configs" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "golang.org/x/xerrors" @@ -97,7 +101,7 @@ type DispatchListener struct { } type workspace struct { - CFS CgroupCFSController + CFS CFSController OWI logrus.Fields HardLimit ResourceLimiter @@ -175,8 +179,13 @@ func (d *DispatchListener) WorkspaceAdded(ctx context.Context, ws *dispatch.Work return xerrors.Errorf("cannot start governer: %w", err) } + controller, err := newCFSController(d.Config.CGroupBasePath, cgroupPath) + if err != nil { + return err + } + d.workspaces[ws.InstanceID] = &workspace{ - CFS: CgroupCFSController(filepath.Join(d.Config.CGroupBasePath, "cpu", cgroupPath)), + CFS: controller, OWI: ws.OWI(), } go func() { @@ -214,3 +223,40 @@ func (d *DispatchListener) WorkspaceUpdated(ctx context.Context, ws *dispatch.Wo return nil } + +func newCFSController(basePath, cgroupPath string) (CFSController, error) { + controllers := filepath.Join(basePath, "cgroup.controllers") + _, err := os.Stat(controllers) + + if os.IsNotExist(err) { + return CgroupV1CFSController(filepath.Join(basePath, "cpu", cgroupPath)), nil + } + + if err == nil { + fullPath := filepath.Join(basePath, cgroupPath) + if err := ensureControllerEnabled(fullPath, "cpu"); err != nil { + return nil, err + } + + return CgroupV2CFSController(fullPath), nil + } + + return nil, err +} + +func ensureControllerEnabled(targetPath, controller string) error { + controllerFile := filepath.Join(targetPath, "cgroup.controllers") + controllers, err := os.ReadFile(controllerFile) + if err != nil { + return err + } + + for _, ctrl := range strings.Fields(string(controllers)) { + if ctrl == controller { + // controller is already activated + return nil + } + } + + return fs2.CreateCgroupPath(targetPath, &configs.Cgroup{}) +}