Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docker: use streaming stats collection to correct CPU stats #24229

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24229.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
docker: Fixed a bug where task CPU stats were reported incorrectly
```
64 changes: 64 additions & 0 deletions drivers/docker/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/docker/docker/errdefs"
"github.com/docker/go-connections/nat"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-set/v3"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/taskenv"
Expand Down Expand Up @@ -3223,3 +3224,66 @@ func TestDockerDriver_GroupAdd(t *testing.T) {

must.Eq(t, cfg.GroupAdd, container.HostConfig.GroupAdd)
}

// TestDockerDriver_CollectStats verifies that the TaskStats API collects stats
// periodically and that these values are non-zero as expected
func TestDockerDriver_CollectStats(t *testing.T) {
ci.Parallel(t)
testutil.RequireLinux(t) // stats outputs are different on Windows
testutil.DockerCompatible(t)

// we want to generate at least some CPU usage
args := []string{"/bin/sh", "-c", "cat /dev/urandom | base64 > /dev/null"}
taskCfg := newTaskConfig("", args)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "nc-demo",
AllocID: uuid.Generate(),
Resources: basicResources,
}
must.NoError(t, task.EncodeConcreteDriverConfig(&taskCfg))

d := dockerDriverHarness(t, nil)
plugin, ok := d.Impl().(*Driver)
must.True(t, ok)
plugin.compute.TotalCompute = 1000
plugin.compute.NumCores = 1

cleanup := d.MkAllocDir(task, true)
defer cleanup()
copyImage(t, task.TaskDir(), "busybox.tar")

_, _, err := d.StartTask(task)
must.NoError(t, err)

defer d.DestroyTask(task.ID, true)

// this test has to run for a while because the minimum stats interval we
// can get from Docker is 1s
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)
recv, err := d.TaskStats(ctx, task.ID, time.Second)
must.NoError(t, err)

statsReceived := 0
tickValues := set.From([]float64{})

DONE:
for {
select {
case stats := <-recv:
statsReceived++
ticks := stats.ResourceUsage.CpuStats.TotalTicks
must.Greater(t, 0, ticks)
tickValues.Insert(ticks)
if statsReceived >= 3 {
cancel() // 3 is plenty
}
case <-ctx.Done():
break DONE
}
}

// CPU stats should be changed with every interval
must.Len(t, statsReceived, tickValues.Slice())
}
26 changes: 16 additions & 10 deletions drivers/docker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func (u *usageSender) send(tru *cstructs.TaskResourceUsage) {
func (u *usageSender) close() {
u.mu.Lock()
defer u.mu.Unlock()

if u.closed {
// already closed
return
Expand Down Expand Up @@ -97,22 +96,29 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
timer, cancel := helper.NewSafeTimer(interval)
defer cancel()

// we need to use the streaming stats API here because our calculation for
// CPU usage depends on having the values from the previous read, which are
// not available in one-shot
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
return
}
defer statsReader.Body.Close()

collectOnce := func() {
defer timer.Reset(interval)
statsReader, err := h.dockerClient.ContainerStatsOneShot(ctx, h.containerID)
var stats *containerapi.Stats
err := json.NewDecoder(statsReader.Body).Decode(&stats)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
h.logger.Debug("error decoding stats data from container", "error", err)
return
}
defer statsReader.Body.Close()

var stats containerapi.Stats
if err := json.NewDecoder(statsReader.Body).Decode(&stats); err != nil {
h.logger.Error("error decoding stats data for container", "error", err)
if stats == nil {
h.logger.Debug("error decoding stats data: stats were nil")
return
}

resourceUsage := util.DockerStatsToTaskResourceUsage(&stats, compute)
resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute)
destCh.send(resourceUsage)
}

Expand Down