Skip to content

Commit

Permalink
Remote global hostfs variables (#29138)
Browse files Browse the repository at this point in the history
* first pass at fixing hostfs

* forgot to commit a bunch of stuff, oops

* fix system/linux tests

* fix tests

* fix other tests

* fix diskio tests

* take another pass at fixing diskio tests

* revert to non-breaking change

* fix docs

* fix metricbeat legacy config logic, fix a few tests

* fix import

* fix libraries, docs

* fix processors

* few code changes
  • Loading branch information
fearful-symmetry authored Dec 9, 2021
1 parent 70ac1b0 commit da8bc7c
Show file tree
Hide file tree
Showing 97 changed files with 545 additions and 556 deletions.
12 changes: 1 addition & 11 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/elastic/beats/v7/libbeat/cloudid"
"github.com/elastic/beats/v7/libbeat/cmd/instance/metrics"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/common/seccomp"
Expand Down Expand Up @@ -1130,22 +1129,13 @@ func initPaths(cfg *common.Config) error {
// the paths field. After we will unpack the complete configuration and keystore reference
// will be correctly replaced.
partialConfig := struct {
Path paths.Path `config:"path"`
Hostfs string `config:"system.hostfs"`
Path paths.Path `config:"path"`
}{}

if paths.IsCLISet() {
cfgwarn.Deprecate("8.0.0", "This flag will be removed in the future and replaced by a config value.")
}

if err := cfg.Unpack(&partialConfig); err != nil {
return fmt.Errorf("error extracting default paths: %+v", err)
}

// Read the value for hostfs as `system.hostfs`
// In the config, there is no `path.hostfs`, as we're merely using the path struct to carry the hostfs variable.
partialConfig.Path.Hostfs = partialConfig.Hostfs

if err := paths.InitPaths(&partialConfig.Path); err != nil {
return fmt.Errorf("error setting default paths: %+v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/cmd/instance/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"github.com/elastic/beats/v7/libbeat/metric/system/cpu"
"github.com/elastic/beats/v7/libbeat/metric/system/numcpu"
"github.com/elastic/beats/v7/libbeat/metric/system/process"
"github.com/elastic/beats/v7/libbeat/metric/system/resolve"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/paths"
)

var (
Expand Down Expand Up @@ -288,7 +288,7 @@ func reportBeatCgroups(_ monitoring.Mode, V monitoring.Visitor) {
}

cgroups, err := cgroup.NewReaderOptions(cgroup.ReaderOptions{
RootfsMountpoint: paths.Paths.Hostfs,
RootfsMountpoint: resolve.NewTestResolver("/"),
IgnoreRootCgroups: true,
CgroupsHierarchyOverride: os.Getenv(libbeatMonitoringCgroupsHierarchyOverride),
})
Expand Down
1 change: 0 additions & 1 deletion libbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func GenRootCmdWithSettings(beatCreator beat.Creator, settings instance.Settings
rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.data"))
rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.logs"))
rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.home"))
rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("system.hostfs"))
rootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("strict.perms"))
if f := flag.CommandLine.Lookup("plugin"); f != nil {
rootCmd.PersistentFlags().AddGoFlag(f)
Expand Down
3 changes: 2 additions & 1 deletion libbeat/docs/command-reference.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,8 @@ endif::[]

*`--system.hostfs MOUNT_POINT`*::

Specifies the mount point of the host's filesystem for use in monitoring a host.
Specifies the mount point of the host's filesystem for use in monitoring a host.
This flag is depricated, and an alternate hostfs should be specified via the `hostfs` module config value.


ifeval::["{beatname_lc}"=="packetbeat"]
Expand Down
7 changes: 4 additions & 3 deletions libbeat/docs/shared-path-config.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,20 @@ Example:
path.logs: /var/log/beats
------------------------------------------------------------------------------

ifeval::["{beatname_lc}"=="metricbeat"]
[float]
==== `system.hostfs`

Specifies the mount point of the host's filesystem for use in monitoring a host.
This can either be set in the config, or with the `--system.hostfs` CLI flag. This is used for cgroup self-monitoring.
ifeval::["{beatname_lc}"=="metricbeat"]
This is also used by the system module to read files from `/proc` and `/sys`.
endif::[]

This is also used by the system module to read files from `/proc` and `/sys`.
This option is deprecated and will be removed in a future release. To set the filesystem root, use the `hostfs` flag inside the module-level config.

Example:

[source,yaml]
------------------------------------------------------------------------------
system.hostfs: /mount/rootfs
------------------------------------------------------------------------------
endif::[]
19 changes: 9 additions & 10 deletions libbeat/metric/system/cgroup/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"fmt"
"io/ioutil"
"path/filepath"
"strconv"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/metric/system/cgroup/cgv1"
"github.com/elastic/beats/v7/libbeat/metric/system/cgroup/cgv2"
"github.com/elastic/beats/v7/libbeat/metric/system/resolve"
)

// StatsV1 contains metrics and limits from each of the cgroup subsystems.
Expand Down Expand Up @@ -71,7 +73,7 @@ type mount struct {
type Reader struct {
// Mountpoint of the root filesystem. Defaults to / if not set. This can be
// useful for example if you mount / as /rootfs inside of a container.
rootfsMountpoint string
rootfsMountpoint resolve.Resolver
ignoreRootCgroups bool // Ignore a cgroup when its path is "/".
cgroupsHierarchyOverride string
cgroupMountpoints Mountpoints // Mountpoints for each subsystem (e.g. cpu, cpuacct, memory, blkio).
Expand All @@ -81,8 +83,8 @@ type Reader struct {
type ReaderOptions struct {
// RootfsMountpoint holds the mountpoint of the root filesystem.
//
// If unspecified, "/" is assumed.
RootfsMountpoint string
// pass
RootfsMountpoint resolve.Resolver

// IgnoreRootCgroups ignores cgroup subsystem with the path "/".
IgnoreRootCgroups bool
Expand All @@ -98,7 +100,7 @@ type ReaderOptions struct {
}

// NewReader creates and returns a new Reader.
func NewReader(rootfsMountpoint string, ignoreRootCgroups bool) (*Reader, error) {
func NewReader(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (*Reader, error) {
return NewReaderOptions(ReaderOptions{
RootfsMountpoint: rootfsMountpoint,
IgnoreRootCgroups: ignoreRootCgroups,
Expand All @@ -107,10 +109,6 @@ func NewReader(rootfsMountpoint string, ignoreRootCgroups bool) (*Reader, error)

// NewReaderOptions creates and returns a new Reader with the given options.
func NewReaderOptions(opts ReaderOptions) (*Reader, error) {
if opts.RootfsMountpoint == "" {
opts.RootfsMountpoint = "/"
}

// Determine what subsystems are supported by the kernel.
subsystems, err := SupportedSubsystems(opts.RootfsMountpoint)
// We can return a not-quite-an-error ErrCgroupsMissing here, so return the bare error.
Expand All @@ -134,7 +132,8 @@ func NewReaderOptions(opts ReaderOptions) (*Reader, error) {

// CgroupsVersion reports if the given PID is attached to a V1 or V2 controller
func (r *Reader) CgroupsVersion(pid int) (CgroupsVersion, error) {
cgPath := filepath.Join(r.rootfsMountpoint, "/proc/", fmt.Sprintf("%d", pid), "cgroup")
cgPath := filepath.Join("/proc/", strconv.Itoa(pid), "cgroup")
cgPath = r.rootfsMountpoint.ResolveHostFS(cgPath)
cgraw, err := ioutil.ReadFile(cgPath)
if err != nil {
return CgroupsV1, errors.Wrapf(err, "error reading %s", cgPath)
Expand Down Expand Up @@ -227,7 +226,7 @@ func (r *Reader) GetV2StatsForProcess(pid int) (*StatsV2, error) {

// ProcessCgroupPaths is a wrapper around Reader.ProcessCgroupPaths for libraries that only need the slimmer functionality from
// the gosigar cgroups code. This does not have the same function signature, and consumers still need to distinguish between v1 and v2 cgroups.
func ProcessCgroupPaths(hostfs string, pid int) (PathList, error) {
func ProcessCgroupPaths(hostfs resolve.Resolver, pid int) (PathList, error) {
reader, err := NewReader(hostfs, false)
if err != nil {
return PathList{}, errors.Wrap(err, "error creating cgroups reader")
Expand Down
10 changes: 6 additions & 4 deletions libbeat/metric/system/cgroup/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/metric/system/resolve"
)

const (
Expand All @@ -31,7 +33,7 @@ const (
)

func TestReaderGetStatsV1(t *testing.T) {
reader, err := NewReader("testdata/docker", true)
reader, err := NewReader(resolve.NewTestResolver("testdata/docker"), true)
assert.NoError(t, err, "error in NewReader")

stats, err := reader.GetV1StatsForProcess(985)
Expand Down Expand Up @@ -67,7 +69,7 @@ func TestReaderGetStatsV1(t *testing.T) {
}

func TestReaderGetStatsV2(t *testing.T) {
reader, err := NewReader("testdata/docker", true)
reader, err := NewReader(resolve.NewTestResolver("testdata/docker"), true)
assert.NoError(t, err, "error in NewReader")

stats, err := reader.GetV2StatsForProcess(312)
Expand All @@ -93,7 +95,7 @@ func TestReaderGetStatsHierarchyOverride(t *testing.T) {
// within a Docker container.

reader, err := NewReaderOptions(ReaderOptions{
RootfsMountpoint: "testdata/docker",
RootfsMountpoint: resolve.NewTestResolver("testdata/docker"),
IgnoreRootCgroups: false,
CgroupsHierarchyOverride: "/",
})
Expand All @@ -113,7 +115,7 @@ func TestReaderGetStatsHierarchyOverride(t *testing.T) {
assert.NotZero(t, stats.CPU.CFS.Shares)

reader2, err := NewReaderOptions(ReaderOptions{
RootfsMountpoint: "testdata/docker",
RootfsMountpoint: resolve.NewTestResolver("testdata/docker"),
IgnoreRootCgroups: true,
CgroupsHierarchyOverride: "/system.slice/",
})
Expand Down
31 changes: 13 additions & 18 deletions libbeat/metric/system/cgroup/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/metric/system/resolve"
)

var (
Expand Down Expand Up @@ -119,12 +120,8 @@ func parseMountinfoLine(line string) (mountinfo, error) {

// SupportedSubsystems returns the subsystems that are supported by the
// kernel. The returned map contains a entry for each subsystem.
func SupportedSubsystems(rootfsMountpoint string) (map[string]struct{}, error) {
if rootfsMountpoint == "" {
rootfsMountpoint = "/"
}

cgroups, err := os.Open(filepath.Join(rootfsMountpoint, "proc", "cgroups"))
func SupportedSubsystems(rootfs resolve.Resolver) (map[string]struct{}, error) {
cgroups, err := os.Open(rootfs.ResolveHostFS("/proc/cgroups"))
if err != nil {
if os.IsNotExist(err) {
return nil, ErrCgroupsMissing
Expand Down Expand Up @@ -171,12 +168,9 @@ func SupportedSubsystems(rootfsMountpoint string) (map[string]struct{}, error) {
// SubsystemMountpoints returns the mountpoints for each of the given subsystems.
// The returned map contains the subsystem name as a key and the value is the
// mountpoint.
func SubsystemMountpoints(rootfsMountpoint string, subsystems map[string]struct{}) (Mountpoints, error) {
if rootfsMountpoint == "" {
rootfsMountpoint = "/"
}
func SubsystemMountpoints(rootfs resolve.Resolver, subsystems map[string]struct{}) (Mountpoints, error) {

mountinfo, err := os.Open(filepath.Join(rootfsMountpoint, "proc", "self", "mountinfo"))
mountinfo, err := os.Open(rootfs.ResolveHostFS("/proc/self/mountinfo"))
if err != nil {
return Mountpoints{}, err
}
Expand All @@ -199,7 +193,8 @@ func SubsystemMountpoints(rootfsMountpoint string, subsystems map[string]struct{
return Mountpoints{}, err
}

if !strings.HasPrefix(mount.mountpoint, rootfsMountpoint) {
// if the mountpoint from the subsystem has a different root than ours, it probably belongs to something else.
if !strings.HasPrefix(mount.mountpoint, rootfs.ResolveHostFS("")) {
continue
}

Expand Down Expand Up @@ -237,7 +232,8 @@ func SubsystemMountpoints(rootfsMountpoint string, subsystems map[string]struct{
// ProcessCgroupPaths returns the cgroups to which a process belongs and the
// pathname of the cgroup relative to the mountpoint of the subsystem.
func (r Reader) ProcessCgroupPaths(pid int) (PathList, error) {
cgroup, err := os.Open(filepath.Join(r.rootfsMountpoint, "proc", strconv.Itoa(pid), "cgroup"))
cgroupPath := filepath.Join("proc", strconv.Itoa(pid), "cgroup")
cgroup, err := os.Open(r.rootfsMountpoint.ResolveHostFS(cgroupPath))
if err != nil {
return PathList{}, err //return a blank error so other events can use any file not found errors
}
Expand Down Expand Up @@ -271,15 +267,14 @@ func (r Reader) ProcessCgroupPaths(pid int) (PathList, error) {
// For this very annoying edge case, revert to the hostfs flag
// If it's not set, warn the user that they've hit this.
controllerPath := filepath.Join(r.cgroupMountpoints.V2Loc, path)
// Depending on the test environment, Hostfs can either be blank, or `/`
if r.cgroupMountpoints.V2Loc == "" && len(r.rootfsMountpoint) <= 1 {
if r.cgroupMountpoints.V2Loc == "" && !r.rootfsMountpoint.IsSet() {
logp.L().Debugf(`PID %d contains a cgroups V2 path (%s) but no V2 mountpoint was found.
This may be because metricbeat is running inside a container on a hybrid system.
To monitor cgroups V2 processess in this way, mount the unified (V2) hierarchy inside
the container as /sys/fs/cgroup/unified and start metricbeat with --system.hostfs.`, pid, line)
the container as /sys/fs/cgroup/unified and start the system module with the hostfs setting.`, pid, line)
continue
} else if r.cgroupMountpoints.V2Loc == "" && len(r.rootfsMountpoint) > 1 {
controllerPath = filepath.Join(r.rootfsMountpoint, "/sys/fs/cgroup/unified", path)
} else if r.cgroupMountpoints.V2Loc == "" && r.rootfsMountpoint.IsSet() {
controllerPath = r.rootfsMountpoint.ResolveHostFS(filepath.Join("/sys/fs/cgroup/unified", path))
}

cgpaths, err := ioutil.ReadDir(controllerPath)
Expand Down
12 changes: 7 additions & 5 deletions libbeat/metric/system/cgroup/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/metric/system/resolve"
)

const dockerTestData = "testdata/docker.zip"
Expand Down Expand Up @@ -103,7 +105,7 @@ func exists(path string) (bool, error) {
}

func TestSupportedSubsystems(t *testing.T) {
subsystems, err := SupportedSubsystems("testdata/docker")
subsystems, err := SupportedSubsystems(resolve.NewTestResolver("testdata/docker"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -126,7 +128,7 @@ func TestSupportedSubsystems(t *testing.T) {
}

func TestSupportedSubsystemsErrCgroupsMissing(t *testing.T) {
_, err := SupportedSubsystems("testdata/doesnotexist")
_, err := SupportedSubsystems(resolve.NewTestResolver("testdata/doesnotexist"))
if err != ErrCgroupsMissing {
t.Fatalf("expected ErrCgroupsMissing, but got %v", err)
}
Expand All @@ -144,7 +146,7 @@ func TestSubsystemMountpoints(t *testing.T) {
subsystems["memory"] = struct{}{}
subsystems["perf_event"] = struct{}{}

mountpoints, err := SubsystemMountpoints("testdata/docker", subsystems)
mountpoints, err := SubsystemMountpoints(resolve.NewTestResolver("testdata/docker"), subsystems)
if err != nil {
t.Fatal(err)
}
Expand All @@ -161,7 +163,7 @@ func TestSubsystemMountpoints(t *testing.T) {
}

func TestProcessCgroupPaths(t *testing.T) {
reader, err := NewReader("testdata/docker", false)
reader, err := NewReader(resolve.NewTestResolver("testdata/docker"), false)
if err != nil {
t.Fatalf("error in NewReader: %s", err)
}
Expand All @@ -185,7 +187,7 @@ func TestProcessCgroupPaths(t *testing.T) {
}

func TestProcessCgroupPathsV2(t *testing.T) {
reader, err := NewReader("testdata/docker", false)
reader, err := NewReader(resolve.NewTestResolver("testdata/docker"), false)
if err != nil {
t.Fatalf("error in NewReader: %s", err)
}
Expand Down
10 changes: 3 additions & 7 deletions libbeat/metric/system/numcpu/cpu_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/paths"
)

// getCPU implements NumCPU on linux
Expand All @@ -45,20 +42,19 @@ func getCPU() (int, bool, error) {
if isPresent {
cpuPath = "/sys/devices/system/cpu/present"
}
sysfspath := filepath.Join(paths.Paths.Hostfs, cpuPath)

rawFile, err := ioutil.ReadFile(sysfspath)
rawFile, err := ioutil.ReadFile(cpuPath)
// if the file doesn't exist, assume it's a support issue and not a bug
if errors.Is(err, os.ErrNotExist) {
return -1, false, nil
}
if err != nil {
return -1, false, errors.Wrapf(err, "error reading file %s", sysfspath)
return -1, false, errors.Wrapf(err, "error reading file %s", cpuPath)
}

cpuCount, err := parseCPUList(string(rawFile))
if err != nil {
return -1, false, errors.Wrapf(err, "error parsing file %s", sysfspath)
return -1, false, errors.Wrapf(err, "error parsing file %s", cpuPath)
}
return cpuCount, true, nil
}
Expand Down
1 change: 1 addition & 0 deletions libbeat/metric/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func (procStats *Stats) Init() error {
cgReader, err := cgroup.NewReaderOptions(procStats.CgroupOpts)
if err == cgroup.ErrCgroupsMissing {
logp.Warn("cgroup data collection will be disabled: %v", err)
procStats.EnableCgroups = false
} else if err != nil {
return errors.Wrap(err, "error initializing cgroup reader")
}
Expand Down
Loading

0 comments on commit da8bc7c

Please sign in to comment.