Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ws-daemon] Support config reload for IO limits #9440

Merged
merged 1 commit into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions components/ws-daemon/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
package cmd

import (
"bytes"
"encoding/json"
"fmt"
"os"

"github.com/spf13/cobra"

"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/common-go/tracing"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/config"
)

var (
Expand Down Expand Up @@ -49,23 +46,6 @@ func Execute() {
}
}

func getConfig() *config.Config {
ctnt, err := os.ReadFile(configFile)
if err != nil {
log.WithError(err).Fatal("cannot read configuration. Maybe missing --config?")
}

var cfg config.Config
dec := json.NewDecoder(bytes.NewReader(ctnt))
dec.DisallowUnknownFields()
err = dec.Decode(&cfg)
if err != nil {
log.WithError(err).Fatal("cannot decode configuration. Maybe missing --config?")
}

return &cfg
}

func init() {
rootCmd.PersistentFlags().BoolVarP(&jsonLog, "json-log", "j", true, "produce JSON log output on verbose level")
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose JSON logging")
Expand Down
7 changes: 6 additions & 1 deletion components/ws-daemon/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ var runCmd = &cobra.Command{
Short: "Connects to the messagebus and starts the workspace monitor",

Run: func(cmd *cobra.Command, args []string) {
cfg := getConfig()
cfg, err := config.Read(configFile)
if err != nil {
log.WithError(err).Fatal("cannot read configuration. Maybe missing --config?")
}
reg := prometheus.NewRegistry()
dmn, err := daemon.NewDaemon(cfg.Daemon, prometheus.WrapRegistererWithPrefix("gitpod_ws_daemon_", reg))
if err != nil {
Expand Down Expand Up @@ -125,6 +128,8 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("cannot start daemon")
}

go config.Watch(configFile, dmn.ReloadConfig)

// run until we're told to stop
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
Expand Down
59 changes: 51 additions & 8 deletions components/ws-daemon/pkg/cgroup/plugin_iolimit_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
"os"
"path/filepath"
"strings"
"sync"

"github.com/gitpod-io/gitpod/common-go/log"
"golang.org/x/xerrors"
)

type IOLimiterV1 struct {
limits limits

cond *sync.Cond
}

type limits struct {
Expand All @@ -34,6 +37,7 @@ func NewIOLimiterV1(writeBytesPerSecond, readBytesPerSecond, writeIOPs, readIOPs
WriteIOPS: writeIOPs,
ReadIOPS: readIOPs,
},
cond: sync.NewCond(&sync.Mutex{}),
}
}

Expand All @@ -50,6 +54,19 @@ const (
// TODO: enable custom configuration
var blockDevices = []string{"sd*", "md*", "nvme0n*"}

func (c *IOLimiterV1) Update(writeBytesPerSecond, readBytesPerSecond, writeIOPs, readIOPs int64) {
c.cond.L.Lock()
defer c.cond.L.Unlock()

c.limits = limits{
WriteBytesPerSecond: writeBytesPerSecond,
ReadBytesPerSecond: readBytesPerSecond,
WriteIOPS: writeIOPs,
ReadIOPS: readIOPs,
}
c.cond.Broadcast()
}

func (c *IOLimiterV1) Apply(ctx context.Context, basePath, cgroupPath string) error {
var devices []string
for _, wc := range blockDevices {
Expand Down Expand Up @@ -112,22 +129,48 @@ func (c *IOLimiterV1) Apply(ctx context.Context, basePath, cgroupPath string) er
return nil
}

update := make(chan struct{}, 1)
go func() {
// TODO(cw): this Go-routine will leak per workspace, until we update config or restart ws-daemon
defer close(update)
for {
c.cond.L.Lock()
c.cond.Wait()
c.cond.L.Unlock()

if ctx.Err() != nil {
return
}
update <- struct{}{}
}
}()
go func() {
log.WithField("cgroupPath", cgroupPath).Debug("starting IO limiting")
err := writeLimits(c.limits)
if err != nil {
log.WithError(err).WithField("cgroupPath", cgroupPath).Error("cannot write IO limits")
}

<-ctx.Done()
// Prior to shutting down though, we need to reset the IO limits to ensure we don't have
// processes stuck in the uninterruptable "D" (disk sleep) state. This would prevent the
// workspace pod from shutting down.
err = writeLimits(limits{})
if err != nil {
log.WithError(err).WithField("cgroupPath", cgroupPath).Error("cannot reset IO limits")
for {
select {
case <-update:
log.WithField("cgroupPath", cgroupPath).WithField("l", c.limits).Debug("writing new IO limiting")
err := writeLimits(c.limits)
if err != nil {
log.WithError(err).WithField("cgroupPath", cgroupPath).Error("cannot write IO limits")
}
case <-ctx.Done():
// Prior to shutting down though, we need to reset the IO limits to ensure we don't have
// processes stuck in the uninterruptable "D" (disk sleep) state. This would prevent the
// workspace pod from shutting down.
err = writeLimits(limits{})
if err != nil {
log.WithError(err).WithField("cgroupPath", cgroupPath).Error("cannot reset IO limits")
}
log.WithField("cgroupPath", cgroupPath).Debug("stopping IO limiting")
}
}
log.WithField("cgroupPath", cgroupPath).Debug("stopping IO limiting")

}()

return nil
Expand Down
83 changes: 83 additions & 0 deletions components/ws-daemon/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,99 @@
package config

import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"encoding/json"
"io"
"os"
"time"

"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"
"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/daemon"
)

func Read(fn string) (*Config, error) {
ctnt, err := os.ReadFile(fn)
if err != nil {
return nil, xerrors.Errorf("cannot read config file: %w", err)
}

var cfg Config
dec := json.NewDecoder(bytes.NewReader(ctnt))
dec.DisallowUnknownFields()
err = dec.Decode(&cfg)
if err != nil {
return nil, xerrors.Errorf("cannot parse config file: %w")
}

return &cfg, nil
}

// watchConfig watches the configuration file and if changed reloads the static layer
func Watch(fn string, cb func(context.Context, *daemon.Config) error) {
hashConfig := func() (hash string, err error) {
f, err := os.Open(fn)
if err != nil {
return "", err
}
defer f.Close()

h := sha256.New()
_, err = io.Copy(h, f)
if err != nil {
return "", err
}

return hex.EncodeToString(h.Sum(nil)), nil
}
reloadConfig := func() error {
cfg, err := Read(fn)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

return cb(ctx, &cfg.Daemon)
}

var (
tick = time.NewTicker(30 * time.Second)
oldHash string
)
defer tick.Stop()
for range tick.C {
currentHash, err := hashConfig()
if err != nil {
log.WithError(err).Warn("cannot check if config has changed")
}

if oldHash == "" {
oldHash = currentHash
}
if currentHash == oldHash {
continue
}
oldHash = currentHash

err = reloadConfig()
if err == nil {
log.Info("configuration was updated - reloaded static layer config")
} else {
log.WithError(err).Error("cannot reload config - config hot reloading did not work")
}
}
}

type Config struct {
Daemon daemon.Config `json:"daemon"`
Service AddrTLS `json:"service"`
Expand Down
27 changes: 27 additions & 0 deletions components/ws-daemon/pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package daemon

import (
"context"

"github.com/gitpod-io/gitpod/ws-daemon/pkg/container"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/content"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/cpulimit"
Expand Down Expand Up @@ -38,3 +40,28 @@ type IOLimitConfig struct {
WriteIOPS int64 `json:"writeIOPS"`
ReadIOPS int64 `json:"readIOPS"`
}

type ConfigReloader interface {
ReloadConfig(context.Context, *Config) error
}

type ConfigReloaderFunc func(context.Context, *Config) error

func (f ConfigReloaderFunc) ReloadConfig(ctx context.Context, cfg *Config) error {
return f(ctx, cfg)
}

type CompositeConfigReloader []ConfigReloader

func (cs CompositeConfigReloader) ReloadConfig(ctx context.Context, cfg *Config) error {
for _, c := range cs {
err := c.ReloadConfig(ctx, cfg)
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
}
return nil
}
34 changes: 22 additions & 12 deletions components/ws-daemon/pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/ws-daemon/api"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/cgroup"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/container"
Expand Down Expand Up @@ -52,12 +51,12 @@ func NewDaemon(config Config, reg prometheus.Registerer) (*Daemon, error) {
return nil, err
}

log.Warn("Creating plugin host")
cgroupV1IOLimiter := cgroup.NewIOLimiterV1(config.IOLimit.WriteBWPerSecond.Value(), config.IOLimit.ReadBWPerSecond.Value(), config.IOLimit.WriteIOPS, config.IOLimit.ReadIOPS)
cgroupPlugins, err := cgroup.NewPluginHost(config.CPULimit.CGroupBasePath,
&cgroup.CacheReclaim{},
&cgroup.FuseDeviceEnablerV1{},
&cgroup.FuseDeviceEnablerV2{},
cgroup.NewIOLimiterV1(config.IOLimit.WriteBWPerSecond.Value(), config.IOLimit.ReadBWPerSecond.Value(), config.IOLimit.WriteIOPS, config.IOLimit.ReadIOPS),
cgroupV1IOLimiter,
cgroup.NewIOLimiterV2(config.IOLimit.WriteBWPerSecond.Value(), config.IOLimit.ReadBWPerSecond.Value(), config.IOLimit.WriteIOPS, config.IOLimit.ReadIOPS),
)
if err != nil {
Expand All @@ -68,7 +67,12 @@ func NewDaemon(config Config, reg prometheus.Registerer) (*Daemon, error) {
return nil, xerrors.Errorf("cannot register cgroup plugin metrics: %w", err)
}

log.Warn("Adding cgroup plugins")
var configReloader CompositeConfigReloader
configReloader = append(configReloader, ConfigReloaderFunc(func(ctx context.Context, config *Config) error {
cgroupV1IOLimiter.Update(config.IOLimit.WriteBWPerSecond.Value(), config.IOLimit.ReadBWPerSecond.Value(), config.IOLimit.WriteIOPS, config.IOLimit.ReadIOPS)
return nil
}))

listener := []dispatch.Listener{
cpulimit.NewDispatchListener(&config.CPULimit, reg),
markUnmountFallback,
Expand Down Expand Up @@ -104,10 +108,11 @@ func NewDaemon(config Config, reg prometheus.Registerer) (*Daemon, error) {
return &Daemon{
Config: config,

dispatch: dsptch,
content: contentService,
diskGuards: dsk,
hosts: hsts,
dispatch: dsptch,
content: contentService,
diskGuards: dsk,
hosts: hsts,
configReloader: configReloader,
}, nil
}

Expand Down Expand Up @@ -137,10 +142,15 @@ func newClientSet(kubeconfig string) (res *kubernetes.Clientset, err error) {
type Daemon struct {
Config Config

dispatch *dispatch.Dispatch
content *content.WorkspaceService
diskGuards []*diskguard.Guard
hosts hosts.Controller
dispatch *dispatch.Dispatch
content *content.WorkspaceService
diskGuards []*diskguard.Guard
hosts hosts.Controller
configReloader ConfigReloader
}

func (d *Daemon) ReloadConfig(ctx context.Context, cfg *Config) error {
return d.configReloader.ReloadConfig(ctx, cfg)
}

// Start runs all parts of the daemon until stop is called
Expand Down