From d593fc12ba96325f9ca7087e169263b4cba9d749 Mon Sep 17 00:00:00 2001 From: yyrdl Date: Thu, 24 Jun 2021 10:42:41 +0800 Subject: [PATCH] support collecting FsUsageMetrics for containerd --- cmd/go.sum | 1 + container/common/fsHandler.go | 124 ++++++++++++++++++---------- container/containerd/client.go | 32 +++++++ container/containerd/client_test.go | 6 ++ container/containerd/handler.go | 57 +++++++++++-- container/crio/handler.go | 3 +- container/docker/handler.go | 3 +- go.mod | 1 + go.sum | 1 + 9 files changed, 178 insertions(+), 50 deletions(-) diff --git a/cmd/go.sum b/cmd/go.sum index 2502faac477..3bd6ed789b9 100644 --- a/cmd/go.sum +++ b/cmd/go.sum @@ -1227,6 +1227,7 @@ k8s.io/component-base v0.20.6/go.mod h1:6f1MPBAeI+mvuts3sIdtpjljHWBQ2cIy38oBIWMY k8s.io/cri-api v0.17.3/go.mod h1:X1sbHmuXhwaHs9xxYffLqJogVsnI+f6cPRcgPel7ywM= k8s.io/cri-api v0.20.1/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI= k8s.io/cri-api v0.20.4/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI= +k8s.io/cri-api v0.20.6 h1:iXX0K2pRrbR8yXbZtDK/bSnmg/uSqIFiVJK1x4LUOMc= k8s.io/cri-api v0.20.6/go.mod h1:ew44AjNXwyn1s0U4xCKGodU7J1HzBeZ1MpGrpa5r8Yc= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= diff --git a/container/common/fsHandler.go b/container/common/fsHandler.go index 5b506b04a1e..c5e4fc5ee6e 100644 --- a/container/common/fsHandler.go +++ b/container/common/fsHandler.go @@ -21,7 +21,6 @@ import ( "time" "github.com/google/cadvisor/fs" - "k8s.io/klog/v2" ) @@ -37,15 +36,21 @@ type FsUsage struct { InodeUsage uint64 } +type FsUsageProvider interface { + // Usage returns the fs usage + Usage() (*FsUsage, error) + // Targets returns where the fs usage metric is collected,it maybe a directory ,a file or some + // information about the snapshotter(for containerd) + Targets() []string +} + type realFsHandler struct { sync.RWMutex - lastUpdate time.Time - usage FsUsage - period time.Duration - minPeriod time.Duration - rootfs string - extraDir string - fsInfo fs.FsInfo + lastUpdate time.Time + usage FsUsage + period time.Duration + minPeriod time.Duration + usageProvider FsUsageProvider // Tells the container to stop. stopChan chan struct{} } @@ -58,51 +63,33 @@ const DefaultPeriod = time.Minute var _ FsHandler = &realFsHandler{} -func NewFsHandler(period time.Duration, rootfs, extraDir string, fsInfo fs.FsInfo) FsHandler { +func NewFsHandler(period time.Duration, provider FsUsageProvider) FsHandler { return &realFsHandler{ - lastUpdate: time.Time{}, - usage: FsUsage{}, - period: period, - minPeriod: period, - rootfs: rootfs, - extraDir: extraDir, - fsInfo: fsInfo, - stopChan: make(chan struct{}, 1), + lastUpdate: time.Time{}, + usage: FsUsage{}, + period: period, + minPeriod: period, + usageProvider: provider, + stopChan: make(chan struct{}, 1), } } func (fh *realFsHandler) update() error { - var ( - rootUsage, extraUsage fs.UsageInfo - rootErr, extraErr error - ) - // TODO(vishh): Add support for external mounts. - if fh.rootfs != "" { - rootUsage, rootErr = fh.fsInfo.GetDirUsage(fh.rootfs) - } - if fh.extraDir != "" { - extraUsage, extraErr = fh.fsInfo.GetDirUsage(fh.extraDir) + usage, err := fh.usageProvider.Usage() + + if err != nil { + return err } - // Wait to handle errors until after all operartions are run. - // An error in one will not cause an early return, skipping others fh.Lock() defer fh.Unlock() fh.lastUpdate = time.Now() - if fh.rootfs != "" && rootErr == nil { - fh.usage.InodeUsage = rootUsage.Inodes - fh.usage.BaseUsageBytes = rootUsage.Bytes - fh.usage.TotalUsageBytes = rootUsage.Bytes - } - if fh.extraDir != "" && extraErr == nil { - fh.usage.TotalUsageBytes += extraUsage.Bytes - } - // Combine errors into a single error to return - if rootErr != nil || extraErr != nil { - return fmt.Errorf("rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr) - } + fh.usage.InodeUsage = usage.InodeUsage + fh.usage.BaseUsageBytes = usage.BaseUsageBytes + fh.usage.TotalUsageBytes = usage.TotalUsageBytes + return nil } @@ -125,7 +112,8 @@ func (fh *realFsHandler) trackUsage() { // if the long duration is persistent either because of slow // disk or lots of containers. longOp = longOp + time.Second - klog.V(2).Infof("fs: disk usage and inodes count on following dirs took %v: %v; will not log again for this container unless duration exceeds %v", duration, []string{fh.rootfs, fh.extraDir}, longOp) + klog.V(2).Infof(`fs: disk usage and inodes count on targets took %v: %v; `+ + `will not log again for this container unless duration exceeds %v`, duration, fh.usageProvider.Targets(), longOp) } select { case <-fh.stopChan: @@ -148,3 +136,55 @@ func (fh *realFsHandler) Usage() FsUsage { defer fh.RUnlock() return fh.usage } + +type fsUsageProvider struct { + fsInfo fs.FsInfo + rootFs string + extraDir string +} + +func NewGeneralFsUsageProvider(fsInfo fs.FsInfo, rootFs, extraDir string) FsUsageProvider { + return &fsUsageProvider{ + fsInfo: fsInfo, + rootFs: rootFs, + extraDir: extraDir, + } +} + +func (f *fsUsageProvider) Targets() []string { + return []string{f.rootFs, f.extraDir} +} + +func (f *fsUsageProvider) Usage() (*FsUsage, error) { + var ( + rootUsage, extraUsage fs.UsageInfo + rootErr, extraErr error + ) + + if f.rootFs != "" { + rootUsage, rootErr = f.fsInfo.GetDirUsage(f.rootFs) + } + + if f.extraDir != "" { + extraUsage, extraErr = f.fsInfo.GetDirUsage(f.extraDir) + } + + usage := &FsUsage{} + + if f.rootFs != "" && rootErr == nil { + usage.InodeUsage = rootUsage.Inodes + usage.BaseUsageBytes = rootUsage.Bytes + usage.TotalUsageBytes = rootUsage.Bytes + } + + if f.extraDir != "" && extraErr == nil { + usage.TotalUsageBytes += extraUsage.Bytes + } + + // Combine errors into a single error to return + if rootErr != nil || extraErr != nil { + return nil, fmt.Errorf("rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr) + } + + return usage, nil +} diff --git a/container/containerd/client.go b/container/containerd/client.go index 47eaffad7bd..a236d2ade63 100644 --- a/container/containerd/client.go +++ b/container/containerd/client.go @@ -28,20 +28,24 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/pkg/dialer" ptypes "github.com/gogo/protobuf/types" + "github.com/google/cadvisor/container/common" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" ) type client struct { containerService containersapi.ContainersClient taskService tasksapi.TasksClient versionService versionapi.VersionClient + runtimeservice runtimeapi.RuntimeServiceClient } type ContainerdClient interface { LoadContainer(ctx context.Context, id string) (*containers.Container, error) TaskPid(ctx context.Context, id string) (uint32, error) Version(ctx context.Context) (string, error) + ContainerFsUsage(ctx context.Context, containerID string) (*common.FsUsage, error) } var once sync.Once @@ -92,6 +96,7 @@ func Client(address, namespace string) (ContainerdClient, error) { containerService: containersapi.NewContainersClient(conn), taskService: tasksapi.NewTasksClient(conn), versionService: versionapi.NewVersionClient(conn), + runtimeservice: runtimeapi.NewRuntimeServiceClient(conn), } }) return ctrdClient, retErr @@ -125,6 +130,33 @@ func (c *client) Version(ctx context.Context) (string, error) { return response.Version, nil } +func (c *client) ContainerFsUsage(ctx context.Context, containerID string) (*common.FsUsage, error) { + // containerd has cached the disk usage metrics in memory + rsp, err := c.runtimeservice.ContainerStats(ctx, &runtimeapi.ContainerStatsRequest{ + ContainerId: containerID, + }) + + if err != nil { + return nil, err + } + + if rsp.Stats == nil || rsp.Stats.WritableLayer == nil { + return nil, fmt.Errorf("container disk usage stats for container (%s) not found", containerID) + } + + usage := &common.FsUsage{} + + if rsp.Stats.WritableLayer.UsedBytes != nil { + usage.BaseUsageBytes = rsp.Stats.WritableLayer.UsedBytes.Value + usage.TotalUsageBytes = rsp.Stats.WritableLayer.UsedBytes.Value + } + + if rsp.Stats.WritableLayer.InodesUsed != nil { + usage.InodeUsage = rsp.Stats.WritableLayer.InodesUsed.Value + } + + return usage, nil +} func containerFromProto(containerpb containersapi.Container) *containers.Container { var runtime containers.RuntimeInfo if containerpb.Runtime != nil { diff --git a/container/containerd/client_test.go b/container/containerd/client_test.go index 7307a7d6820..8dffbdaa125 100644 --- a/container/containerd/client_test.go +++ b/container/containerd/client_test.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/containerd/containerd/containers" + "github.com/google/cadvisor/container/common" ) type containerdClientMock struct { @@ -45,6 +46,11 @@ func (c *containerdClientMock) TaskPid(ctx context.Context, id string) (uint32, return 2389, nil } +func (c *containerdClientMock) ContainerFsUsage(ctx context.Context, + containerdID string) (*common.FsUsage, error) { + return &common.FsUsage{}, nil +} + func mockcontainerdClient(cntrs map[string]*containers.Container, returnErr error) ContainerdClient { return &containerdClientMock{ cntrs: cntrs, diff --git a/container/containerd/handler.go b/container/containerd/handler.go index a8be0acbefe..a5d3a62f8ff 100644 --- a/container/containerd/handler.go +++ b/container/containerd/handler.go @@ -13,6 +13,7 @@ // limitations under the License. // Handler for containerd containers. + package containerd import ( @@ -38,6 +39,7 @@ type containerdContainerHandler struct { // (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test") cgroupPaths map[string]string fsInfo fs.FsInfo + fsHandler common.FsHandler // Metadata associated with the container. reference info.ContainerReference envs map[string]string @@ -122,9 +124,14 @@ func newContainerdContainerHandler( libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootfs, int(taskPid), includedMetrics) handler := &containerdContainerHandler{ - machineInfoFactory: machineInfoFactory, - cgroupPaths: cgroupPaths, - fsInfo: fsInfo, + machineInfoFactory: machineInfoFactory, + cgroupPaths: cgroupPaths, + fsInfo: fsInfo, + fsHandler: common.NewFsHandler(common.DefaultPeriod, &fsUsageProvider{ + ctx: ctx, + client: client, + containerID: cntr.ID, + }), envs: make(map[string]string), labels: cntr.Labels, includedMetrics: includedMetrics, @@ -169,9 +176,7 @@ func (h *containerdContainerHandler) needNet() bool { } func (h *containerdContainerHandler) GetSpec() (info.ContainerSpec, error) { - // TODO: Since we dont collect disk usage stats for containerd, we set hasFilesystem - // to false. Revisit when we support disk usage stats for containerd - hasFilesystem := false + hasFilesystem := true spec, err := common.GetSpec(h.cgroupPaths, h.machineInfoFactory, h.needNet(), hasFilesystem) spec.Labels = h.labels spec.Envs = h.envs @@ -189,6 +194,26 @@ func (h *containerdContainerHandler) getFsStats(stats *info.ContainerStats) erro if h.includedMetrics.Has(container.DiskIOMetrics) { common.AssignDeviceNamesToDiskStats((*common.MachineInfoNamer)(mi), &stats.DiskIo) } + if !h.includedMetrics.Has(container.DiskUsageMetrics) { + return nil + } + + // TODO(yyrdl):for overlay ,the 'upperPath' is: + // `${containerd.Config.Root}/io.containerd.snapshotter.v1.overlayfs/snapshots/${snapshots.ID}/fs`, + // and for other snapshots plugins, we can also find the law from containerd's source code. + + // Device 、fsType and fsLimits and other information are not supported yet, unless there is a way to + // know the id of the snapshot , or the `Stat`(snapshotsClient.Stat) method returns these information directly. + // And containerd has cached the disk usage stats in memory,so the best way is enhancing containerd's API + // (avoid collecting disk usage metrics twice) + fsStat := info.FsStats{} + usage := h.fsHandler.Usage() + fsStat.BaseUsage = usage.BaseUsageBytes + fsStat.Usage = usage.TotalUsageBytes + fsStat.Inodes = usage.InodeUsage + + stats.Filesystem = append(stats.Filesystem, fsStat) + return nil } @@ -239,12 +264,32 @@ func (h *containerdContainerHandler) Type() container.ContainerType { } func (h *containerdContainerHandler) Start() { + if h.fsHandler != nil { + h.fsHandler.Start() + } } func (h *containerdContainerHandler) Cleanup() { + if h.fsHandler != nil { + h.fsHandler.Stop() + } } func (h *containerdContainerHandler) GetContainerIPAddress() string { // containerd doesnt take care of networking.So it doesnt maintain networking states return "" } + +type fsUsageProvider struct { + ctx context.Context + containerID string + client ContainerdClient +} + +func (f *fsUsageProvider) Usage() (*common.FsUsage, error) { + return f.client.ContainerFsUsage(f.ctx, f.containerID) +} + +func (f *fsUsageProvider) Targets() []string { + return []string{fmt.Sprintf("containerd %s", f.containerID)} +} diff --git a/container/crio/handler.go b/container/crio/handler.go index b1ef5b045de..4af04197dbf 100644 --- a/container/crio/handler.go +++ b/container/crio/handler.go @@ -183,7 +183,8 @@ func newCrioContainerHandler( // we optionally collect disk usage metrics if includedMetrics.Has(container.DiskUsageMetrics) { - handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, storageLogDir, fsInfo) + handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, common.NewGeneralFsUsageProvider( + fsInfo, rootfsStorageDir, storageLogDir)) } // TODO for env vars we wanted to show from container.Config.Env from whitelist //for _, exposedEnv := range metadataEnvs { diff --git a/container/docker/handler.go b/container/docker/handler.go index e9afc752446..4ca90fc994e 100644 --- a/container/docker/handler.go +++ b/container/docker/handler.go @@ -240,7 +240,8 @@ func newDockerContainerHandler( if includedMetrics.Has(container.DiskUsageMetrics) { handler.fsHandler = &dockerFsHandler{ - fsHandler: common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, otherStorageDir, fsInfo), + fsHandler: common.NewFsHandler(common.DefaultPeriod, common.NewGeneralFsUsageProvider( + fsInfo, rootfsStorageDir, otherStorageDir)), thinPoolWatcher: thinPoolWatcher, zfsWatcher: zfsWatcher, deviceID: ctnr.GraphDriver.Data["DeviceId"], diff --git a/go.mod b/go.mod index 8eb61283ae6..b9695653cca 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 golang.org/x/sys v0.0.0-20210426230700-d19ff857e887 google.golang.org/grpc v1.33.2 + k8s.io/cri-api v0.20.6 k8s.io/klog/v2 v2.4.0 k8s.io/utils v0.0.0-20201110183641-67b214c5f920 ) diff --git a/go.sum b/go.sum index fab53c9865f..84d62bf14a1 100644 --- a/go.sum +++ b/go.sum @@ -978,6 +978,7 @@ k8s.io/component-base v0.20.6/go.mod h1:6f1MPBAeI+mvuts3sIdtpjljHWBQ2cIy38oBIWMY k8s.io/cri-api v0.17.3/go.mod h1:X1sbHmuXhwaHs9xxYffLqJogVsnI+f6cPRcgPel7ywM= k8s.io/cri-api v0.20.1/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI= k8s.io/cri-api v0.20.4/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI= +k8s.io/cri-api v0.20.6 h1:iXX0K2pRrbR8yXbZtDK/bSnmg/uSqIFiVJK1x4LUOMc= k8s.io/cri-api v0.20.6/go.mod h1:ew44AjNXwyn1s0U4xCKGodU7J1HzBeZ1MpGrpa5r8Yc= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=