Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hoststats: add package for collecting host statistics including cpu memory and disk usage #17038

Merged
merged 9 commits into from
May 30, 2023
3 changes: 3 additions & 0 deletions .changelog/17038.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
agent: add new metrics to track cpu disk and memory usage for server hosts (defaults to: enabled)
```
3 changes: 3 additions & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,9 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
LocalProxyConfigResyncInterval: 30 * time.Second,
}

// host metrics are enabled by default if consul is configured with HashiCorp Cloud Platform integration
rt.Telemetry.EnableHostMetrics = boolValWithDefault(c.Telemetry.EnableHostMetrics, rt.IsCloudEnabled())

rt.TLS, err = b.buildTLSConfig(rt, c.TLS)
if err != nil {
return RuntimeConfig{}, err
Expand Down
19 changes: 19 additions & 0 deletions agent/config/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,22 @@ func TestBuilder_parsePrefixFilter(t *testing.T) {
}
})
}

func TestBuidler_hostMetricsWithCloud(t *testing.T) {
devMode := true
builderOpts := LoadOpts{
DevMode: &devMode,
DefaultConfig: FileSource{
Name: "test",
Format: "hcl",
Data: `cloud{ resource_id = "abc" client_id = "abc" client_secret = "abc"}`,
},
}

result, err := Load(builderOpts)
require.NoError(t, err)
require.Empty(t, result.Warnings)
cfg := result.RuntimeConfig
require.NotNil(t, cfg)
require.True(t, cfg.Telemetry.EnableHostMetrics)
}
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ type Telemetry struct {
CirconusSubmissionInterval *string `mapstructure:"circonus_submission_interval" json:"circonus_submission_interval,omitempty"`
CirconusSubmissionURL *string `mapstructure:"circonus_submission_url" json:"circonus_submission_url,omitempty"`
DisableHostname *bool `mapstructure:"disable_hostname" json:"disable_hostname,omitempty"`
EnableHostMetrics *bool `mapstructure:"enable_host_metrics" json:"enable_host_metrics,omitempty"`
DogstatsdAddr *string `mapstructure:"dogstatsd_addr" json:"dogstatsd_addr,omitempty"`
DogstatsdTags []string `mapstructure:"dogstatsd_tags" json:"dogstatsd_tags,omitempty"`
RetryFailedConfiguration *bool `mapstructure:"retry_failed_connection" json:"retry_failed_connection,omitempty"`
Expand Down
19 changes: 13 additions & 6 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type testCase struct {
desc string
args []string
setup func() // TODO: accept a testing.T instead of panic
cleanup func()
expected func(rt *RuntimeConfig)
expectedErr string
expectedWarnings []string
Expand Down Expand Up @@ -2308,9 +2309,9 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
},
setup: func() {
os.Setenv("HCP_RESOURCE_ID", "env-id")
t.Cleanup(func() {
os.Unsetenv("HCP_RESOURCE_ID")
})
},
cleanup: func() {
os.Unsetenv("HCP_RESOURCE_ID")
},
expected: func(rt *RuntimeConfig) {
rt.DataDir = dataDir
Expand All @@ -2321,6 +2322,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {

// server things
rt.ServerMode = true
rt.Telemetry.EnableHostMetrics = true
rt.TLS.ServerMode = true
rt.LeaveOnTerm = false
rt.SkipLeaveOnInt = true
Expand All @@ -2337,9 +2339,9 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
},
setup: func() {
os.Setenv("HCP_RESOURCE_ID", "env-id")
t.Cleanup(func() {
os.Unsetenv("HCP_RESOURCE_ID")
})
},
cleanup: func() {
os.Unsetenv("HCP_RESOURCE_ID")
},
json: []string{`{
"cloud": {
Expand All @@ -2360,6 +2362,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {

// server things
rt.ServerMode = true
rt.Telemetry.EnableHostMetrics = true
rt.TLS.ServerMode = true
rt.LeaveOnTerm = false
rt.SkipLeaveOnInt = true
Expand Down Expand Up @@ -6032,6 +6035,9 @@ func (tc testCase) run(format string, dataDir string) func(t *testing.T) {
expected.ACLResolverSettings.EnterpriseMeta = *structs.NodeEnterpriseMetaInPartition(expected.PartitionOrDefault())

prototest.AssertDeepEqual(t, expected, actual, cmpopts.EquateEmpty())
if tc.cleanup != nil {
tc.cleanup()
}
}
}

Expand Down Expand Up @@ -6754,6 +6760,7 @@ func TestLoad_FullConfig(t *testing.T) {
Expiration: 15 * time.Second,
Name: "ftO6DySn", // notice this is the same as the metrics prefix
},
EnableHostMetrics: true,
},
TLS: tlsutil.Config{
InternalRPC: tlsutil.ProtocolConfig{
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@
"DisableHostname": false,
"DogstatsdAddr": "",
"DogstatsdTags": [],
"EnableHostMetrics": false,
"FilterDefault": false,
"MetricsPrefix": "",
"PrometheusOpts": {
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ telemetry {
circonus_check_tags = "prvO4uBl"
circonus_submission_interval = "DolzaflP"
circonus_submission_url = "gTcbS93G"
enable_host_metrics = true
disable_hostname = true
dogstatsd_addr = "0wSndumK"
dogstatsd_tags = [ "3N81zSUB","Xtj8AnXZ" ]
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@
"circonus_check_tags": "prvO4uBl",
"circonus_submission_interval": "DolzaflP",
"circonus_submission_url": "gTcbS93G",
"enable_host_metrics": true,
"disable_hostname": true,
"dogstatsd_addr": "0wSndumK",
"dogstatsd_tags": [
Expand Down
21 changes: 16 additions & 5 deletions agent/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package agent

import (
"context"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -41,6 +42,7 @@ import (
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/hoststats"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/tlsutil"
)
Expand All @@ -59,6 +61,7 @@ type BaseDeps struct {
WatchedFiles []string

deregisterBalancer, deregisterResolver func()
stopHostCollector context.CancelFunc
}

type ConfigLoader func(source config.Source) (config.LoadResult, error)
Expand Down Expand Up @@ -117,6 +120,11 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl
if err != nil {
return d, fmt.Errorf("failed to initialize telemetry: %w", err)
}
if !cfg.Telemetry.Disable && cfg.Telemetry.EnableHostMetrics {
ctx, cancel := context.WithCancel(context.Background())
hoststats.NewCollector(ctx, d.Logger, cfg.DataDir)
d.stopHostCollector = cancel
}

d.TLSConfigurator, err = tlsutil.NewConfigurator(cfg.TLS, d.Logger)
if err != nil {
Expand Down Expand Up @@ -214,11 +222,10 @@ func (bd BaseDeps) Close() {
bd.AutoConfig.Stop()
bd.MetricsConfig.Cancel()

if fn := bd.deregisterBalancer; fn != nil {
fn()
}
if fn := bd.deregisterResolver; fn != nil {
fn()
for _, fn := range []func(){bd.deregisterBalancer, bd.deregisterResolver, bd.stopHostCollector} {
if fn != nil {
fn()
}
}
}

Expand Down Expand Up @@ -297,6 +304,10 @@ func getPrometheusDefs(cfg *config.RuntimeConfig, isServer bool) ([]prometheus.G
serverGauges,
}

if cfg.Telemetry.EnableHostMetrics {
gauges = append(gauges, hoststats.Gauges)
}

// TODO(ffmmm): conditionally add only leader specific metrics to gauges, counters, summaries, etc
if isServer {
gauges = append(gauges,
Expand Down
189 changes: 189 additions & 0 deletions lib/hoststats/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package hoststats

import (
"context"
"fmt"
"math"
"runtime"
"sync"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/host"
"github.com/shirou/gopsutil/v3/mem"
)

// Collector collects host resource usage stats
type Collector struct {
numCores int
cpuCalculator map[string]*cpuStatsCalculator
hostStats *HostStats
hostStatsLock sync.RWMutex
dataDir string

metrics Metrics
baseLabels []metrics.Label

logger hclog.Logger
}

// NewCollector returns a Collector. The dataDir is passed in
// so that we can present the disk related statistics for the mountpoint where the dataDir exists
func NewCollector(ctx context.Context, logger hclog.Logger, dataDir string, opts ...CollectorOption) *Collector {
logger = logger.Named("host_stats")
collector := initCollector(logger, dataDir)
go collector.loop(ctx)
return collector
}

// initCollector initializes the Collector but does not start the collection loop
func initCollector(logger hclog.Logger, dataDir string, opts ...CollectorOption) *Collector {
numCores := runtime.NumCPU()
statsCalculator := make(map[string]*cpuStatsCalculator)
collector := &Collector{
cpuCalculator: statsCalculator,
numCores: numCores,
logger: logger,
dataDir: dataDir,
}

for _, opt := range opts {
opt(collector)
}

if collector.metrics == nil {
collector.metrics = metrics.Default()
}
return collector
}

func (c *Collector) loop(ctx context.Context) {
// Start collecting host stats right away and then keep collecting every
// collection interval
next := time.NewTimer(0)
defer next.Stop()
for {
select {
case <-next.C:
c.collect()
next.Reset(hostStatsCollectionInterval)
c.Stats().Emit(c.metrics, c.baseLabels)

case <-ctx.Done():
return
}
}
}

// collect will collect stats related to resource usage of the host
func (c *Collector) collect() {
hs := &HostStats{Timestamp: time.Now().UTC().UnixNano()}

// Determine up-time
uptime, err := host.Uptime()
if err != nil {
c.logger.Error("failed to collect uptime stats", "error", err)
uptime = 0
}
hs.Uptime = uptime

// Collect memory stats
mstats, err := c.collectMemoryStats()
if err != nil {
c.logger.Error("failed to collect memory stats", "error", err)
mstats = &MemoryStats{}
}
hs.Memory = mstats

// Collect cpu stats
cpus, err := c.collectCPUStats()
if err != nil {
c.logger.Error("failed to collect cpu stats", "error", err)
cpus = []*CPUStats{}
}
hs.CPU = cpus

// Collect disk stats
diskStats, err := c.collectDiskStats(c.dataDir)
if err != nil {
c.logger.Error("failed to collect dataDir disk stats", "error", err)
}
hs.DataDirStats = diskStats

// Update the collected status object.
c.hostStatsLock.Lock()
c.hostStats = hs
c.hostStatsLock.Unlock()
}

func (c *Collector) collectDiskStats(dir string) (*DiskStats, error) {
usage, err := disk.Usage(dir)
if err != nil {
return nil, fmt.Errorf("failed to collect disk usage stats: %w", err)
}
return c.toDiskStats(usage), nil
}

func (c *Collector) collectMemoryStats() (*MemoryStats, error) {
memStats, err := mem.VirtualMemory()
if err != nil {
return nil, err
}
mem := &MemoryStats{
Total: memStats.Total,
Available: memStats.Available,
Used: memStats.Used,
UsedPercent: memStats.UsedPercent,
Free: memStats.Free,
}

return mem, nil
}

// Stats returns the host stats that has been collected
func (c *Collector) Stats() *HostStats {
c.hostStatsLock.RLock()
defer c.hostStatsLock.RUnlock()

if c.hostStats == nil {
return &HostStats{}
}

return c.hostStats.Clone()
}

// toDiskStats merges UsageStat and PartitionStat to create a DiskStat
func (c *Collector) toDiskStats(usage *disk.UsageStat) *DiskStats {
ds := DiskStats{
Size: usage.Total,
Used: usage.Used,
Available: usage.Free,
UsedPercent: usage.UsedPercent,
InodesUsedPercent: usage.InodesUsedPercent,
Path: usage.Path,
}
if math.IsNaN(ds.UsedPercent) {
ds.UsedPercent = 0.0
}
if math.IsNaN(ds.InodesUsedPercent) {
ds.InodesUsedPercent = 0.0
}

return &ds
}

type CollectorOption func(c *Collector)

func WithMetrics(m *metrics.Metrics) CollectorOption {
return func(c *Collector) {
c.metrics = m
}
}

func WithBaseLabels(labels []metrics.Label) CollectorOption {
return func(c *Collector) {
c.baseLabels = labels
}
}
Loading