Skip to content

Commit

Permalink
feat(linux/cpu): 🚚 split cpu usage and frequency workers
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuar committed Dec 14, 2024
1 parent 748b48f commit cc18b67
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 199 deletions.
1 change: 1 addition & 0 deletions internal/agent/workers_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var sensorPollingWorkersInitFuncs = []func(ctx context.Context) (*linux.PollingS
disk.NewUsageWorker,
cpu.NewLoadAvgWorker,
cpu.NewUsageWorker,
cpu.NewFreqWorker,
mem.NewUsageWorker,
net.NewNetStatsWorker,
problems.NewProblemsWorker,
Expand Down
18 changes: 0 additions & 18 deletions internal/linux/cpu/common.go

This file was deleted.

81 changes: 81 additions & 0 deletions internal/linux/cpu/cpufreq.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,29 @@ package cpu

import (
"bytes"
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

"github.com/joshuar/go-hass-agent/internal/hass/sensor"
"github.com/joshuar/go-hass-agent/internal/hass/sensor/types"
"github.com/joshuar/go-hass-agent/internal/linux"
"github.com/joshuar/go-hass-agent/internal/logging"
"github.com/joshuar/go-hass-agent/internal/preferences"
)

const (
cpuFreqUpdateInterval = 30 * time.Second
cpuFreqUpdateJitter = time.Second

cpuFreqWorkerID = "cpu_freq_sensors"

freqFile = "cpufreq/scaling_cur_freq"
governorFile = "cpufreq/scaling_governor"
driverFile = "cpufreq/scaling_driver"
Expand All @@ -27,6 +39,38 @@ const (
cpuFreqUnits = "kHz"
)

var totalCPUs = runtime.NumCPU()

// FreqWorkerPrefs are the preferences for the CPU frequency worker.
type FreqWorkerPrefs struct {
UpdateInterval string `toml:"update_interval" comment:"Time between updates of CPU frequency sensors (default 30s)."`
preferences.CommonWorkerPrefs
}

type freqWorker struct{}

func (w *freqWorker) UpdateDelta(_ time.Duration) {}

func (w *freqWorker) Sensors(_ context.Context) ([]sensor.Entity, error) {
sensors := make([]sensor.Entity, totalCPUs)

for i := range totalCPUs {
sensors[i] = newCPUFreqSensor("cpu" + strconv.Itoa(i))
}

return sensors, nil
}

func (w *freqWorker) PreferencesID() string {
return cpuFreqWorkerID
}

func (w *freqWorker) DefaultPreferences() FreqWorkerPrefs {
return FreqWorkerPrefs{
UpdateInterval: cpuFreqUpdateInterval.String(),
}
}

type cpuFreq struct {
cpu string
governor string
Expand Down Expand Up @@ -86,3 +130,40 @@ func readCPUFreqProp(id, file string) string {

return string(bytes.TrimSpace(prop))
}

func NewFreqWorker(ctx context.Context) (*linux.PollingSensorWorker, error) {
var err error

pollWorker := linux.NewPollingSensorWorker(cpuFreqWorkerID, cpuFreqUpdateInterval, cpuFreqUpdateJitter)

worker := &freqWorker{}

prefs, err := preferences.LoadWorker(ctx, worker)
if err != nil {
return pollWorker, fmt.Errorf("could not load preferences: %w", err)
}

// If disabled, don't use the addressWorker.
if prefs.Disabled {
return pollWorker, nil
}

interval, err := time.ParseDuration(prefs.UpdateInterval)
if err != nil {
logging.FromContext(ctx).Warn("Could not parse update interval, using default value.",
slog.String("requested_value", prefs.UpdateInterval),
slog.String("default_value", cpuFreqUpdateInterval.String()))
// Save preferences with default interval value.
prefs.UpdateInterval = cpuFreqUpdateInterval.String()
if err := preferences.SaveWorker(ctx, worker, *prefs); err != nil {
logging.FromContext(ctx).Warn("Could not save preferences.", slog.Any("error", err))
}

interval = cpuUsageUpdateInterval
}

pollWorker.PollInterval = interval
pollWorker.PollingSensorType = worker

return pollWorker, nil
}
172 changes: 168 additions & 4 deletions internal/linux/cpu/usage.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
// Copyright (c) 2024 Joshua Rich <[email protected]>
//
// This software is released under the MIT License.
// https://opensource.org/licenses/MIT
// Copyright 2024 Joshua Rich <[email protected]>.
// SPDX-License-Identifier: MIT

//revive:disable:unused-receiver
package cpu

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"
"strconv"
"strings"
"time"
Expand All @@ -15,8 +22,165 @@ import (
"github.com/joshuar/go-hass-agent/internal/hass/sensor"
"github.com/joshuar/go-hass-agent/internal/hass/sensor/types"
"github.com/joshuar/go-hass-agent/internal/linux"
"github.com/joshuar/go-hass-agent/internal/logging"
"github.com/joshuar/go-hass-agent/internal/preferences"
)

const (
cpuUsageUpdateInterval = 10 * time.Second
cpuUsageUpdateJitter = 500 * time.Millisecond

cpuUsageWorkerID = "cpu_usage_sensors"

totalCPUString = "cpu"
)

var ErrParseCPUUsage = errors.New("could not parse CPU usage")

// UsagePrefs are the preferences for the CPU usage worker.
type UsagePrefs struct {
UpdateInterval string `toml:"update_interval" comment:"Time between updates of CPU usage sensors (default 10s)."`
preferences.CommonWorkerPrefs
}

type usageWorker struct {
boottime time.Time
rateSensors map[string]*rateSensor
path string
linux.PollingSensorWorker
clktck int64
delta time.Duration
}

func (w *usageWorker) UpdateDelta(delta time.Duration) {
w.delta = delta
}

func (w *usageWorker) Sensors(_ context.Context) ([]sensor.Entity, error) {
return w.getUsageStats()
}

func (w *usageWorker) PreferencesID() string {
return cpuUsageWorkerID
}

func (w *usageWorker) DefaultPreferences() UsagePrefs {
return UsagePrefs{
UpdateInterval: cpuUsageUpdateInterval.String(),
}
}

func NewUsageWorker(ctx context.Context) (*linux.PollingSensorWorker, error) {
var err error

worker := linux.NewPollingSensorWorker(cpuUsageWorkerID, cpuUsageUpdateInterval, cpuUsageUpdateJitter)

clktck, found := linux.CtxGetClkTck(ctx)
if !found {
return worker, fmt.Errorf("%w: no clktck value", linux.ErrInvalidCtx)
}

boottime, found := linux.CtxGetBoottime(ctx)
if !found {
return worker, fmt.Errorf("%w: no boottime value", linux.ErrInvalidCtx)
}

cpuUsageWorker := &usageWorker{
path: filepath.Join(linux.ProcFSRoot, "stat"),
boottime: boottime,
clktck: clktck,
rateSensors: map[string]*rateSensor{
"ctxt": newRateSensor("CPU Context Switch Rate", "mdi:counter", "ctx/s"),
"processes": newRateSensor("Processes Creation Rate", "mdi:application-cog", "processes/s"),
},
}

prefs, err := preferences.LoadWorker(ctx, cpuUsageWorker)
if err != nil {
return worker, fmt.Errorf("could not load preferences: %w", err)
}

// If disabled, don't use the addressWorker.
if prefs.Disabled {
return worker, nil
}

interval, err := time.ParseDuration(prefs.UpdateInterval)
if err != nil {
logging.FromContext(ctx).Warn("Could not parse update interval, using default value.",
slog.String("requested_value", prefs.UpdateInterval),
slog.String("default_value", cpuUsageUpdateInterval.String()))
// Save preferences with default interval value.
prefs.UpdateInterval = cpuUsageUpdateInterval.String()
if err := preferences.SaveWorker(ctx, cpuUsageWorker, *prefs); err != nil {
logging.FromContext(ctx).Warn("Could not save preferences.", slog.Any("error", err))
}

interval = cpuUsageUpdateInterval
}

worker.PollInterval = interval
worker.PollingSensorType = cpuUsageWorker

return worker, nil
}

func (w *usageWorker) getUsageStats() ([]sensor.Entity, error) {
var sensors []sensor.Entity

statsFH, err := os.Open(w.path)
if err != nil {
return nil, fmt.Errorf("fetch cpu usage: %w", err)
}

defer statsFH.Close()

statsFile := bufio.NewScanner(statsFH)
for statsFile.Scan() {
// Set up word scanner for line.
line := bufio.NewScanner(bytes.NewReader(statsFile.Bytes()))
line.Split(bufio.ScanWords)
// Split line by words
var cols []string
for line.Scan() {
cols = append(cols, line.Text())
}

if len(cols) == 0 {
return sensors, ErrParseCPUUsage
}
// Create a sensor depending on the line.
switch {
case cols[0] == totalCPUString:
sensors = append(sensors, newUsageSensor(w.clktck, cols, types.CategoryDefault))
case strings.Contains(cols[0], "cpu"):
sensors = append(sensors, newUsageSensor(w.clktck, cols, types.CategoryDiagnostic))
case cols[0] == "ctxt":
if _, found := w.rateSensors["ctxt"]; found {
w.rateSensors["ctxt"].update(w.delta, cols[1])
} else {
w.rateSensors["ctxt"] = newRateSensor("CPU Context Switch Rate", "mdi:counter", "ctx/s")
}

sensors = append(sensors, *w.rateSensors["ctxt"].Entity)
case cols[0] == "processes":
if _, found := w.rateSensors["processes"]; found {
w.rateSensors["processes"].update(w.delta, cols[1])
} else {
w.rateSensors["processes"] = newRateSensor("Processes Creation Rate", "mdi:application-cog", "processes/s")
}

sensors = append(sensors, *w.rateSensors["processes"].Entity)
case cols[0] == "procs_running":
sensors = append(sensors, newCountSensor("Processes Running", "mdi:application-cog", cols[1]))
case cols[0] == "procs_blocked":
sensors = append(sensors, newCountSensor("Processes Blocked", "mdi:application-cog", cols[1]))
}
}

return sensors, nil
}

//nolint:lll
var times = [...]string{"user_time", "nice_time", "system_time", "idle_time", "iowait_time", "irq_time", "softirq_time", "steal_time", "guest_time", "guest_nice_time"}

Expand Down
Loading

0 comments on commit cc18b67

Please sign in to comment.