Skip to content

Commit

Permalink
[ws-daemon] Support config reload for IO limits
Browse files Browse the repository at this point in the history
  • Loading branch information
csweichel committed Apr 20, 2022
1 parent c81c93b commit 13c633a
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 41 deletions.
20 changes: 0 additions & 20 deletions components/ws-daemon/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
package cmd

import (
"bytes"
"encoding/json"
"fmt"
"os"

"github.com/spf13/cobra"

"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/common-go/tracing"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/config"
)

var (
Expand Down Expand Up @@ -49,23 +46,6 @@ func Execute() {
}
}

func getConfig() *config.Config {
ctnt, err := os.ReadFile(configFile)
if err != nil {
log.WithError(err).Fatal("cannot read configuration. Maybe missing --config?")
}

var cfg config.Config
dec := json.NewDecoder(bytes.NewReader(ctnt))
dec.DisallowUnknownFields()
err = dec.Decode(&cfg)
if err != nil {
log.WithError(err).Fatal("cannot decode configuration. Maybe missing --config?")
}

return &cfg
}

func init() {
rootCmd.PersistentFlags().BoolVarP(&jsonLog, "json-log", "j", true, "produce JSON log output on verbose level")
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose JSON logging")
Expand Down
7 changes: 6 additions & 1 deletion components/ws-daemon/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ var runCmd = &cobra.Command{
Short: "Connects to the messagebus and starts the workspace monitor",

Run: func(cmd *cobra.Command, args []string) {
cfg := getConfig()
cfg, err := config.Read(configFile)
if err != nil {
log.WithError(err).Fatal("cannot read configuration. Maybe missing --config?")
}
reg := prometheus.NewRegistry()
dmn, err := daemon.NewDaemon(cfg.Daemon, prometheus.WrapRegistererWithPrefix("gitpod_ws_daemon_", reg))
if err != nil {
Expand Down Expand Up @@ -125,6 +128,8 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("cannot start daemon")
}

go config.Watch(configFile, dmn.ReloadConfig)

// run until we're told to stop
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
Expand Down
39 changes: 31 additions & 8 deletions components/ws-daemon/pkg/cgroup/plugin_iolimit_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

type IOLimiterV1 struct {
limits limits
update chan limits
}

type limits struct {
Expand All @@ -34,6 +35,7 @@ func NewIOLimiterV1(writeBytesPerSecond, readBytesPerSecond, writeIOPs, readIOPs
WriteIOPS: writeIOPs,
ReadIOPS: readIOPs,
},
update: make(chan limits, 1),
}
}

Expand All @@ -50,6 +52,15 @@ const (
// TODO: enable custom configuration
var blockDevices = []string{"sd*", "md*", "nvme0n*"}

func (c *IOLimiterV1) Update(writeBytesPerSecond, readBytesPerSecond, writeIOPs, readIOPs int64) {
c.update <- limits{
WriteBytesPerSecond: writeBytesPerSecond,
ReadBytesPerSecond: readBytesPerSecond,
WriteIOPS: writeIOPs,
ReadIOPS: readIOPs,
}
}

func (c *IOLimiterV1) Apply(ctx context.Context, basePath, cgroupPath string) error {
var devices []string
for _, wc := range blockDevices {
Expand Down Expand Up @@ -119,15 +130,27 @@ func (c *IOLimiterV1) Apply(ctx context.Context, basePath, cgroupPath string) er
log.WithError(err).WithField("cgroupPath", cgroupPath).Error("cannot write IO limits")
}

<-ctx.Done()
// Prior to shutting down though, we need to reset the IO limits to ensure we don't have
// processes stuck in the uninterruptable "D" (disk sleep) state. This would prevent the
// workspace pod from shutting down.
err = writeLimits(limits{})
if err != nil {
log.WithError(err).WithField("cgroupPath", cgroupPath).Error("cannot reset IO limits")
for {
select {
case l := <-c.update:
c.limits = l
log.WithField("cgroupPath", cgroupPath).WithField("l", c.limits).Debug("writing new IO limiting")
err := writeLimits(c.limits)
if err != nil {
log.WithError(err).WithField("cgroupPath", cgroupPath).Error("cannot write IO limits")
}
case <-ctx.Done():
// Prior to shutting down though, we need to reset the IO limits to ensure we don't have
// processes stuck in the uninterruptable "D" (disk sleep) state. This would prevent the
// workspace pod from shutting down.
err = writeLimits(limits{})
if err != nil {
log.WithError(err).WithField("cgroupPath", cgroupPath).Error("cannot reset IO limits")
}
log.WithField("cgroupPath", cgroupPath).Debug("stopping IO limiting")
}
}
log.WithField("cgroupPath", cgroupPath).Debug("stopping IO limiting")

}()

return nil
Expand Down
83 changes: 83 additions & 0 deletions components/ws-daemon/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,99 @@
package config

import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"encoding/json"
"io"
"os"
"time"

"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"
"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/daemon"
)

func Read(fn string) (*Config, error) {
ctnt, err := os.ReadFile(fn)
if err != nil {
return nil, xerrors.Errorf("cannot read config file: %w", err)
}

var cfg Config
dec := json.NewDecoder(bytes.NewReader(ctnt))
dec.DisallowUnknownFields()
err = dec.Decode(&cfg)
if err != nil {
return nil, xerrors.Errorf("cannot parse config file: %w")
}

return &cfg, nil
}

// watchConfig watches the configuration file and if changed reloads the static layer
func Watch(fn string, cb func(context.Context, *daemon.Config) error) {
hashConfig := func() (hash string, err error) {
f, err := os.Open(fn)
if err != nil {
return "", err
}
defer f.Close()

h := sha256.New()
_, err = io.Copy(h, f)
if err != nil {
return "", err
}

return hex.EncodeToString(h.Sum(nil)), nil
}
reloadConfig := func() error {
cfg, err := Read(fn)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

return cb(ctx, &cfg.Daemon)
}

var (
tick = time.NewTicker(30 * time.Second)
oldHash string
)
defer tick.Stop()
for range tick.C {
currentHash, err := hashConfig()
if err != nil {
log.WithError(err).Warn("cannot check if config has changed")
}

if oldHash == "" {
oldHash = currentHash
}
if currentHash == oldHash {
continue
}
oldHash = currentHash

err = reloadConfig()
if err == nil {
log.Info("configuration was updated - reloaded static layer config")
} else {
log.WithError(err).Error("cannot reload config - config hot reloading did not work")
}
}
}

type Config struct {
Daemon daemon.Config `json:"daemon"`
Service AddrTLS `json:"service"`
Expand Down
27 changes: 27 additions & 0 deletions components/ws-daemon/pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package daemon

import (
"context"

"github.com/gitpod-io/gitpod/ws-daemon/pkg/container"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/content"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/cpulimit"
Expand Down Expand Up @@ -38,3 +40,28 @@ type IOLimitConfig struct {
WriteIOPS int64 `json:"writeIOPS"`
ReadIOPS int64 `json:"readIOPS"`
}

type ConfigReloader interface {
ReloadConfig(context.Context, *Config) error
}

type ConfigReloaderFunc func(context.Context, *Config) error

func (f ConfigReloaderFunc) ReloadConfig(ctx context.Context, cfg *Config) error {
return f(ctx, cfg)
}

type CompositeConfigReloader []ConfigReloader

func (cs CompositeConfigReloader) ReloadConfig(ctx context.Context, cfg *Config) error {
for _, c := range cs {
err := c.ReloadConfig(ctx, cfg)
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
}
return nil
}
34 changes: 22 additions & 12 deletions components/ws-daemon/pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/ws-daemon/api"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/cgroup"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/container"
Expand Down Expand Up @@ -52,12 +51,12 @@ func NewDaemon(config Config, reg prometheus.Registerer) (*Daemon, error) {
return nil, err
}

log.Warn("Creating plugin host")
cgroupV1IOLimiter := cgroup.NewIOLimiterV1(config.IOLimit.WriteBWPerSecond.Value(), config.IOLimit.ReadBWPerSecond.Value(), config.IOLimit.WriteIOPS, config.IOLimit.ReadIOPS)
cgroupPlugins, err := cgroup.NewPluginHost(config.CPULimit.CGroupBasePath,
&cgroup.CacheReclaim{},
&cgroup.FuseDeviceEnablerV1{},
&cgroup.FuseDeviceEnablerV2{},
cgroup.NewIOLimiterV1(config.IOLimit.WriteBWPerSecond.Value(), config.IOLimit.ReadBWPerSecond.Value(), config.IOLimit.WriteIOPS, config.IOLimit.ReadIOPS),
cgroupV1IOLimiter,
cgroup.NewIOLimiterV2(config.IOLimit.WriteBWPerSecond.Value(), config.IOLimit.ReadBWPerSecond.Value(), config.IOLimit.WriteIOPS, config.IOLimit.ReadIOPS),
)
if err != nil {
Expand All @@ -68,7 +67,12 @@ func NewDaemon(config Config, reg prometheus.Registerer) (*Daemon, error) {
return nil, xerrors.Errorf("cannot register cgroup plugin metrics: %w", err)
}

log.Warn("Adding cgroup plugins")
var configReloader CompositeConfigReloader
configReloader = append(configReloader, ConfigReloaderFunc(func(ctx context.Context, config *Config) error {
cgroupV1IOLimiter.Update(config.IOLimit.WriteBWPerSecond.Value(), config.IOLimit.ReadBWPerSecond.Value(), config.IOLimit.WriteIOPS, config.IOLimit.ReadIOPS)
return nil
}))

listener := []dispatch.Listener{
cpulimit.NewDispatchListener(&config.CPULimit, reg),
markUnmountFallback,
Expand Down Expand Up @@ -104,10 +108,11 @@ func NewDaemon(config Config, reg prometheus.Registerer) (*Daemon, error) {
return &Daemon{
Config: config,

dispatch: dsptch,
content: contentService,
diskGuards: dsk,
hosts: hsts,
dispatch: dsptch,
content: contentService,
diskGuards: dsk,
hosts: hsts,
configReloader: configReloader,
}, nil
}

Expand Down Expand Up @@ -137,10 +142,15 @@ func newClientSet(kubeconfig string) (res *kubernetes.Clientset, err error) {
type Daemon struct {
Config Config

dispatch *dispatch.Dispatch
content *content.WorkspaceService
diskGuards []*diskguard.Guard
hosts hosts.Controller
dispatch *dispatch.Dispatch
content *content.WorkspaceService
diskGuards []*diskguard.Guard
hosts hosts.Controller
configReloader ConfigReloader
}

func (d *Daemon) ReloadConfig(ctx context.Context, cfg *Config) error {
return d.configReloader.ReloadConfig(ctx, cfg)
}

// Start runs all parts of the daemon until stop is called
Expand Down

0 comments on commit 13c633a

Please sign in to comment.