Skip to content

Commit

Permalink
support collecting FsUsageMetrics for containerd
Browse files Browse the repository at this point in the history
  • Loading branch information
yyrdl committed Jun 24, 2021
1 parent 8795a0e commit d593fc1
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 50 deletions.
1 change: 1 addition & 0 deletions cmd/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,7 @@ k8s.io/component-base v0.20.6/go.mod h1:6f1MPBAeI+mvuts3sIdtpjljHWBQ2cIy38oBIWMY
k8s.io/cri-api v0.17.3/go.mod h1:X1sbHmuXhwaHs9xxYffLqJogVsnI+f6cPRcgPel7ywM=
k8s.io/cri-api v0.20.1/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI=
k8s.io/cri-api v0.20.4/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI=
k8s.io/cri-api v0.20.6 h1:iXX0K2pRrbR8yXbZtDK/bSnmg/uSqIFiVJK1x4LUOMc=
k8s.io/cri-api v0.20.6/go.mod h1:ew44AjNXwyn1s0U4xCKGodU7J1HzBeZ1MpGrpa5r8Yc=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
Expand Down
124 changes: 82 additions & 42 deletions container/common/fsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/google/cadvisor/fs"

"k8s.io/klog/v2"
)

Expand All @@ -37,15 +36,21 @@ type FsUsage struct {
InodeUsage uint64
}

type FsUsageProvider interface {
// Usage returns the fs usage
Usage() (*FsUsage, error)
// Targets returns where the fs usage metric is collected,it maybe a directory ,a file or some
// information about the snapshotter(for containerd)
Targets() []string
}

type realFsHandler struct {
sync.RWMutex
lastUpdate time.Time
usage FsUsage
period time.Duration
minPeriod time.Duration
rootfs string
extraDir string
fsInfo fs.FsInfo
lastUpdate time.Time
usage FsUsage
period time.Duration
minPeriod time.Duration
usageProvider FsUsageProvider
// Tells the container to stop.
stopChan chan struct{}
}
Expand All @@ -58,51 +63,33 @@ const DefaultPeriod = time.Minute

var _ FsHandler = &realFsHandler{}

func NewFsHandler(period time.Duration, rootfs, extraDir string, fsInfo fs.FsInfo) FsHandler {
func NewFsHandler(period time.Duration, provider FsUsageProvider) FsHandler {
return &realFsHandler{
lastUpdate: time.Time{},
usage: FsUsage{},
period: period,
minPeriod: period,
rootfs: rootfs,
extraDir: extraDir,
fsInfo: fsInfo,
stopChan: make(chan struct{}, 1),
lastUpdate: time.Time{},
usage: FsUsage{},
period: period,
minPeriod: period,
usageProvider: provider,
stopChan: make(chan struct{}, 1),
}
}

func (fh *realFsHandler) update() error {
var (
rootUsage, extraUsage fs.UsageInfo
rootErr, extraErr error
)
// TODO(vishh): Add support for external mounts.
if fh.rootfs != "" {
rootUsage, rootErr = fh.fsInfo.GetDirUsage(fh.rootfs)
}

if fh.extraDir != "" {
extraUsage, extraErr = fh.fsInfo.GetDirUsage(fh.extraDir)
usage, err := fh.usageProvider.Usage()

if err != nil {
return err
}

// Wait to handle errors until after all operartions are run.
// An error in one will not cause an early return, skipping others
fh.Lock()
defer fh.Unlock()
fh.lastUpdate = time.Now()
if fh.rootfs != "" && rootErr == nil {
fh.usage.InodeUsage = rootUsage.Inodes
fh.usage.BaseUsageBytes = rootUsage.Bytes
fh.usage.TotalUsageBytes = rootUsage.Bytes
}
if fh.extraDir != "" && extraErr == nil {
fh.usage.TotalUsageBytes += extraUsage.Bytes
}

// Combine errors into a single error to return
if rootErr != nil || extraErr != nil {
return fmt.Errorf("rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr)
}
fh.usage.InodeUsage = usage.InodeUsage
fh.usage.BaseUsageBytes = usage.BaseUsageBytes
fh.usage.TotalUsageBytes = usage.TotalUsageBytes

return nil
}

Expand All @@ -125,7 +112,8 @@ func (fh *realFsHandler) trackUsage() {
// if the long duration is persistent either because of slow
// disk or lots of containers.
longOp = longOp + time.Second
klog.V(2).Infof("fs: disk usage and inodes count on following dirs took %v: %v; will not log again for this container unless duration exceeds %v", duration, []string{fh.rootfs, fh.extraDir}, longOp)
klog.V(2).Infof(`fs: disk usage and inodes count on targets took %v: %v; `+
`will not log again for this container unless duration exceeds %v`, duration, fh.usageProvider.Targets(), longOp)
}
select {
case <-fh.stopChan:
Expand All @@ -148,3 +136,55 @@ func (fh *realFsHandler) Usage() FsUsage {
defer fh.RUnlock()
return fh.usage
}

type fsUsageProvider struct {
fsInfo fs.FsInfo
rootFs string
extraDir string
}

func NewGeneralFsUsageProvider(fsInfo fs.FsInfo, rootFs, extraDir string) FsUsageProvider {
return &fsUsageProvider{
fsInfo: fsInfo,
rootFs: rootFs,
extraDir: extraDir,
}
}

func (f *fsUsageProvider) Targets() []string {
return []string{f.rootFs, f.extraDir}
}

func (f *fsUsageProvider) Usage() (*FsUsage, error) {
var (
rootUsage, extraUsage fs.UsageInfo
rootErr, extraErr error
)

if f.rootFs != "" {
rootUsage, rootErr = f.fsInfo.GetDirUsage(f.rootFs)
}

if f.extraDir != "" {
extraUsage, extraErr = f.fsInfo.GetDirUsage(f.extraDir)
}

usage := &FsUsage{}

if f.rootFs != "" && rootErr == nil {
usage.InodeUsage = rootUsage.Inodes
usage.BaseUsageBytes = rootUsage.Bytes
usage.TotalUsageBytes = rootUsage.Bytes
}

if f.extraDir != "" && extraErr == nil {
usage.TotalUsageBytes += extraUsage.Bytes
}

// Combine errors into a single error to return
if rootErr != nil || extraErr != nil {
return nil, fmt.Errorf("rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr)
}

return usage, nil
}
32 changes: 32 additions & 0 deletions container/containerd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,24 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/pkg/dialer"
ptypes "github.com/gogo/protobuf/types"
"github.com/google/cadvisor/container/common"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)

type client struct {
containerService containersapi.ContainersClient
taskService tasksapi.TasksClient
versionService versionapi.VersionClient
runtimeservice runtimeapi.RuntimeServiceClient
}

type ContainerdClient interface {
LoadContainer(ctx context.Context, id string) (*containers.Container, error)
TaskPid(ctx context.Context, id string) (uint32, error)
Version(ctx context.Context) (string, error)
ContainerFsUsage(ctx context.Context, containerID string) (*common.FsUsage, error)
}

var once sync.Once
Expand Down Expand Up @@ -92,6 +96,7 @@ func Client(address, namespace string) (ContainerdClient, error) {
containerService: containersapi.NewContainersClient(conn),
taskService: tasksapi.NewTasksClient(conn),
versionService: versionapi.NewVersionClient(conn),
runtimeservice: runtimeapi.NewRuntimeServiceClient(conn),
}
})
return ctrdClient, retErr
Expand Down Expand Up @@ -125,6 +130,33 @@ func (c *client) Version(ctx context.Context) (string, error) {
return response.Version, nil
}

func (c *client) ContainerFsUsage(ctx context.Context, containerID string) (*common.FsUsage, error) {
// containerd has cached the disk usage metrics in memory
rsp, err := c.runtimeservice.ContainerStats(ctx, &runtimeapi.ContainerStatsRequest{
ContainerId: containerID,
})

if err != nil {
return nil, err
}

if rsp.Stats == nil || rsp.Stats.WritableLayer == nil {
return nil, fmt.Errorf("container disk usage stats for container (%s) not found", containerID)
}

usage := &common.FsUsage{}

if rsp.Stats.WritableLayer.UsedBytes != nil {
usage.BaseUsageBytes = rsp.Stats.WritableLayer.UsedBytes.Value
usage.TotalUsageBytes = rsp.Stats.WritableLayer.UsedBytes.Value
}

if rsp.Stats.WritableLayer.InodesUsed != nil {
usage.InodeUsage = rsp.Stats.WritableLayer.InodesUsed.Value
}

return usage, nil
}
func containerFromProto(containerpb containersapi.Container) *containers.Container {
var runtime containers.RuntimeInfo
if containerpb.Runtime != nil {
Expand Down
6 changes: 6 additions & 0 deletions container/containerd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/containerd/containerd/containers"
"github.com/google/cadvisor/container/common"
)

type containerdClientMock struct {
Expand All @@ -45,6 +46,11 @@ func (c *containerdClientMock) TaskPid(ctx context.Context, id string) (uint32,
return 2389, nil
}

func (c *containerdClientMock) ContainerFsUsage(ctx context.Context,
containerdID string) (*common.FsUsage, error) {
return &common.FsUsage{}, nil
}

func mockcontainerdClient(cntrs map[string]*containers.Container, returnErr error) ContainerdClient {
return &containerdClientMock{
cntrs: cntrs,
Expand Down
57 changes: 51 additions & 6 deletions container/containerd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

// Handler for containerd containers.

package containerd

import (
Expand All @@ -38,6 +39,7 @@ type containerdContainerHandler struct {
// (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test")
cgroupPaths map[string]string
fsInfo fs.FsInfo
fsHandler common.FsHandler
// Metadata associated with the container.
reference info.ContainerReference
envs map[string]string
Expand Down Expand Up @@ -122,9 +124,14 @@ func newContainerdContainerHandler(
libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootfs, int(taskPid), includedMetrics)

handler := &containerdContainerHandler{
machineInfoFactory: machineInfoFactory,
cgroupPaths: cgroupPaths,
fsInfo: fsInfo,
machineInfoFactory: machineInfoFactory,
cgroupPaths: cgroupPaths,
fsInfo: fsInfo,
fsHandler: common.NewFsHandler(common.DefaultPeriod, &fsUsageProvider{
ctx: ctx,
client: client,
containerID: cntr.ID,
}),
envs: make(map[string]string),
labels: cntr.Labels,
includedMetrics: includedMetrics,
Expand Down Expand Up @@ -169,9 +176,7 @@ func (h *containerdContainerHandler) needNet() bool {
}

func (h *containerdContainerHandler) GetSpec() (info.ContainerSpec, error) {
// TODO: Since we dont collect disk usage stats for containerd, we set hasFilesystem
// to false. Revisit when we support disk usage stats for containerd
hasFilesystem := false
hasFilesystem := true
spec, err := common.GetSpec(h.cgroupPaths, h.machineInfoFactory, h.needNet(), hasFilesystem)
spec.Labels = h.labels
spec.Envs = h.envs
Expand All @@ -189,6 +194,26 @@ func (h *containerdContainerHandler) getFsStats(stats *info.ContainerStats) erro
if h.includedMetrics.Has(container.DiskIOMetrics) {
common.AssignDeviceNamesToDiskStats((*common.MachineInfoNamer)(mi), &stats.DiskIo)
}
if !h.includedMetrics.Has(container.DiskUsageMetrics) {
return nil
}

// TODO(yyrdl):for overlay ,the 'upperPath' is:
// `${containerd.Config.Root}/io.containerd.snapshotter.v1.overlayfs/snapshots/${snapshots.ID}/fs`,
// and for other snapshots plugins, we can also find the law from containerd's source code.

// Device 、fsType and fsLimits and other information are not supported yet, unless there is a way to
// know the id of the snapshot , or the `Stat`(snapshotsClient.Stat) method returns these information directly.
// And containerd has cached the disk usage stats in memory,so the best way is enhancing containerd's API
// (avoid collecting disk usage metrics twice)
fsStat := info.FsStats{}
usage := h.fsHandler.Usage()
fsStat.BaseUsage = usage.BaseUsageBytes
fsStat.Usage = usage.TotalUsageBytes
fsStat.Inodes = usage.InodeUsage

stats.Filesystem = append(stats.Filesystem, fsStat)

return nil
}

Expand Down Expand Up @@ -239,12 +264,32 @@ func (h *containerdContainerHandler) Type() container.ContainerType {
}

func (h *containerdContainerHandler) Start() {
if h.fsHandler != nil {
h.fsHandler.Start()
}
}

func (h *containerdContainerHandler) Cleanup() {
if h.fsHandler != nil {
h.fsHandler.Stop()
}
}

func (h *containerdContainerHandler) GetContainerIPAddress() string {
// containerd doesnt take care of networking.So it doesnt maintain networking states
return ""
}

type fsUsageProvider struct {
ctx context.Context
containerID string
client ContainerdClient
}

func (f *fsUsageProvider) Usage() (*common.FsUsage, error) {
return f.client.ContainerFsUsage(f.ctx, f.containerID)
}

func (f *fsUsageProvider) Targets() []string {
return []string{fmt.Sprintf("containerd %s", f.containerID)}
}
3 changes: 2 additions & 1 deletion container/crio/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ func newCrioContainerHandler(

// we optionally collect disk usage metrics
if includedMetrics.Has(container.DiskUsageMetrics) {
handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, storageLogDir, fsInfo)
handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, common.NewGeneralFsUsageProvider(
fsInfo, rootfsStorageDir, storageLogDir))
}
// TODO for env vars we wanted to show from container.Config.Env from whitelist
//for _, exposedEnv := range metadataEnvs {
Expand Down
3 changes: 2 additions & 1 deletion container/docker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ func newDockerContainerHandler(

if includedMetrics.Has(container.DiskUsageMetrics) {
handler.fsHandler = &dockerFsHandler{
fsHandler: common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, otherStorageDir, fsInfo),
fsHandler: common.NewFsHandler(common.DefaultPeriod, common.NewGeneralFsUsageProvider(
fsInfo, rootfsStorageDir, otherStorageDir)),
thinPoolWatcher: thinPoolWatcher,
zfsWatcher: zfsWatcher,
deviceID: ctnr.GraphDriver.Data["DeviceId"],
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887
google.golang.org/grpc v1.33.2
k8s.io/cri-api v0.20.6
k8s.io/klog/v2 v2.4.0
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
)
Loading

0 comments on commit d593fc1

Please sign in to comment.