Skip to content

Commit

Permalink
[EBPF] gpu: associate streams to containers (#29651)
Browse files Browse the repository at this point in the history
  • Loading branch information
gjulianm authored Nov 11, 2024
1 parent cf76f8b commit e07fb1b
Show file tree
Hide file tree
Showing 15 changed files with 220 additions and 60 deletions.
File renamed without changes.
10 changes: 9 additions & 1 deletion pkg/gpu/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ import (
"time"
"unsafe"

"golang.org/x/sys/unix"

ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf"
"github.com/DataDog/datadog-agent/pkg/gpu/config"
gpuebpf "github.com/DataDog/datadog-agent/pkg/gpu/ebpf"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util/cgroups"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand Down Expand Up @@ -109,7 +112,12 @@ func (c *cudaEventConsumer) Start() {
key := streamKey{pid: pid, stream: header.Stream_id}

if _, ok := c.streamHandlers[key]; !ok {
c.streamHandlers[key] = newStreamHandler(key.pid, c.sysCtx)
cgroup := unix.ByteSliceToString(header.Cgroup[:])
containerID, err := cgroups.ContainerFilter("", cgroup)
if err != nil {
log.Errorf("error getting container ID for cgroup %s: %s", cgroup, err)
}
c.streamHandlers[key] = newStreamHandler(key.pid, containerID, c.sysCtx)
}

switch header.Type {
Expand Down
29 changes: 13 additions & 16 deletions pkg/gpu/ebpf/c/runtime/gpu.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "map-defs.h"
#include "bpf_telemetry.h"
#include "bpf_builtins.h"
#include "cgroup.h"

#include "types.h"

Expand All @@ -32,6 +33,14 @@ static inline void load_dim3(__u64 xy, __u64 z, dim3 *dst) {
dst->z = z;
}

static inline void fill_header(cuda_event_header_t *header, __u64 stream_id, cuda_event_type_t type) {
header->pid_tgid = bpf_get_current_pid_tgid();
header->ktime_ns = bpf_ktime_get_ns();
header->stream_id = stream_id;
header->type = type;
get_cgroup_name(header->cgroup, sizeof(header->cgroup));
}

SEC("uprobe/cudaLaunchKernel")
int BPF_UPROBE(uprobe__cudaLaunchKernel, const void *func, __u64 grid_xy, __u64 grid_z, __u64 block_xy, __u64 block_z, void **args) {
cuda_kernel_launch_t launch_data = { 0 };
Expand All @@ -53,10 +62,7 @@ int BPF_UPROBE(uprobe__cudaLaunchKernel, const void *func, __u64 grid_xy, __u64

load_dim3(grid_xy, grid_z, &launch_data.grid_size);
load_dim3(block_xy, block_z, &launch_data.block_size);
launch_data.header.pid_tgid = bpf_get_current_pid_tgid();
launch_data.header.ktime_ns = bpf_ktime_get_ns();
launch_data.header.stream_id = (uint64_t)stream;
launch_data.header.type = cuda_kernel_launch;
fill_header(&launch_data.header, stream, cuda_kernel_launch);
launch_data.kernel_addr = (uint64_t)func;
launch_data.shared_mem_size = shared_mem;

Expand Down Expand Up @@ -93,10 +99,7 @@ int BPF_URETPROBE(uretprobe__cudaMalloc) {
return 0;
}

mem_data.header.pid_tgid = bpf_get_current_pid_tgid();
mem_data.header.stream_id = (uint64_t)0;
mem_data.header.type = cuda_memory_event;
mem_data.header.ktime_ns = bpf_ktime_get_ns();
fill_header(&mem_data.header, 0, cuda_memory_event);
mem_data.type = cudaMalloc;
mem_data.size = args->size;

Expand All @@ -118,10 +121,7 @@ SEC("uprobe/cudaFree")
int BPF_UPROBE(uprobe__cudaFree, void *mem) {
cuda_memory_event_t mem_data = { 0 };

mem_data.header.pid_tgid = bpf_get_current_pid_tgid();
mem_data.header.stream_id = (uint64_t)0;
mem_data.header.type = cuda_memory_event;
mem_data.header.ktime_ns = bpf_ktime_get_ns();
fill_header(&mem_data.header, 0, cuda_memory_event);
mem_data.size = 0;
mem_data.addr = (uint64_t)mem;
mem_data.type = cudaFree;
Expand Down Expand Up @@ -155,10 +155,7 @@ int BPF_URETPROBE(uretprobe__cudaStreamSynchronize) {
return 0;
}

event.header.pid_tgid = bpf_get_current_pid_tgid();
event.header.stream_id = *stream;
event.header.type = cuda_sync;
event.header.ktime_ns = bpf_ktime_get_ns();
fill_header(&event.header, *stream, cuda_sync);

log_debug("cudaStreamSynchronize[ret]: EMIT cudaSync pid_tgid=%llu, stream_id=%llu", event.header.pid_tgid, event.header.stream_id);

Expand Down
3 changes: 3 additions & 0 deletions pkg/gpu/ebpf/c/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ typedef enum {
cuda_sync
} cuda_event_type_t;

#define MAX_CONTAINER_ID_LEN 129

typedef struct {
cuda_event_type_t type;
__u64 pid_tgid;
__u64 stream_id;
__u64 ktime_ns;
char cgroup[MAX_CONTAINER_ID_LEN];
} cuda_event_header_t;

typedef struct {
Expand Down
10 changes: 6 additions & 4 deletions pkg/gpu/ebpf/kprobe_types_linux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 42 additions & 5 deletions pkg/gpu/probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func (s *probeTestSuite) TestCanReceiveEvents() {

probe := s.getProbe()

cmd, err := testutil.RunSample(t, testutil.CudaSample)
require.NoError(t, err)
cmd := testutil.RunSample(t, testutil.CudaSample)

utils.WaitForProgramsToBeTraced(t, gpuAttacherName, cmd.Process.Pid, utils.ManualTracingFallbackDisabled)

Expand Down Expand Up @@ -116,8 +115,7 @@ func (s *probeTestSuite) TestCanGenerateStats() {

probe := s.getProbe()

cmd, err := testutil.RunSample(t, testutil.CudaSample)
require.NoError(t, err)
cmd := testutil.RunSample(t, testutil.CudaSample)

utils.WaitForProgramsToBeTraced(t, gpuAttacherName, cmd.Process.Pid, utils.ManualTracingFallbackDisabled)

Expand All @@ -126,7 +124,6 @@ func (s *probeTestSuite) TestCanGenerateStats() {
require.Eventually(t, func() bool {
return !utils.IsProgramTraced(gpuAttacherName, cmd.Process.Pid)
}, 20*time.Second, 500*time.Millisecond, "process not stopped")
require.NoError(t, err)

stats, err := probe.GetAndFlush()
require.NoError(t, err)
Expand All @@ -138,3 +135,43 @@ func (s *probeTestSuite) TestCanGenerateStats() {
require.Greater(t, pidStats.UtilizationPercentage, 0.0) // percentage depends on the time this took to run, so it's not deterministic
require.Equal(t, pidStats.Memory.MaxBytes, uint64(110))
}

func (s *probeTestSuite) TestDetectsContainer() {
t := s.T()

procMon := monitor.GetProcessMonitor()
require.NotNil(t, procMon)
require.NoError(t, procMon.Initialize(false))
t.Cleanup(procMon.Stop)

probe := s.getProbe()

args := testutil.GetDefaultArgs()
args.EndWaitTimeSec = 1
pid, cid := testutil.RunSampleInDockerWithArgs(t, testutil.CudaSample, testutil.MinimalDockerImage, args)

utils.WaitForProgramsToBeTraced(t, gpuAttacherName, pid, utils.ManualTracingFallbackDisabled)

// Wait until the process finishes and we can get the stats. Run this instead of waiting for the process to finish
// so that we can time out correctly
require.Eventually(t, func() bool {
return !utils.IsProgramTraced(gpuAttacherName, pid)
}, 20*time.Second, 500*time.Millisecond, "process not stopped")

// Check that the stream handlers have the correct container ID assigned
for key, handler := range probe.consumer.streamHandlers {
if key.pid == uint32(pid) {
require.Equal(t, cid, handler.containerID)
}
}

stats, err := probe.GetAndFlush()
require.NoError(t, err)
require.NotNil(t, stats)
require.NotEmpty(t, stats.ProcessStats)
require.Contains(t, stats.ProcessStats, uint32(pid))

pidStats := stats.ProcessStats[uint32(pid)]
require.Greater(t, pidStats.UtilizationPercentage, 0.0) // percentage depends on the time this took to run, so it's not deterministic
require.Equal(t, pidStats.Memory.MaxBytes, uint64(110))
}
4 changes: 3 additions & 1 deletion pkg/gpu/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type StreamHandler struct {
allocations []*memoryAllocation
processEnded bool // A marker to indicate that the process has ended, and this handler should be flushed
sysCtx *systemContext
containerID string
}

// enrichedKernelLaunch is a kernel launch event with the kernel data attached.
Expand Down Expand Up @@ -110,11 +111,12 @@ type kernelSpan struct {
avgMemoryUsage map[memAllocType]uint64
}

func newStreamHandler(pid uint32, sysCtx *systemContext) *StreamHandler {
func newStreamHandler(pid uint32, containerID string, sysCtx *systemContext) *StreamHandler {
return &StreamHandler{
memAllocEvents: make(map[uint64]gpuebpf.CudaMemEvent),
pid: pid,
sysCtx: sysCtx,
containerID: containerID,
}
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/gpu/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func TestKernelLaunchesHandled(t *testing.T) {
sysCtx, err := getSystemContext(testutil.GetBasicNvmlMock(), kernel.ProcFSRoot())
require.NoError(t, err)
stream := newStreamHandler(0, sysCtx)
stream := newStreamHandler(0, "", sysCtx)

kernStartTime := uint64(1)
launch := &gpuebpf.CudaKernelLaunch{
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestKernelLaunchesHandled(t *testing.T) {
func TestMemoryAllocationsHandled(t *testing.T) {
sysCtx, err := getSystemContext(testutil.GetBasicNvmlMock(), kernel.ProcFSRoot())
require.NoError(t, err)
stream := newStreamHandler(0, sysCtx)
stream := newStreamHandler(0, "", sysCtx)

memAllocTime := uint64(1)
memFreeTime := uint64(2)
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestMemoryAllocationsHandled(t *testing.T) {
func TestMemoryAllocationsDetectLeaks(t *testing.T) {
sysCtx, err := getSystemContext(testutil.GetBasicNvmlMock(), kernel.ProcFSRoot())
require.NoError(t, err)
stream := newStreamHandler(0, sysCtx)
stream := newStreamHandler(0, "", sysCtx)

memAllocTime := uint64(1)
memAddr := uint64(42)
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestMemoryAllocationsDetectLeaks(t *testing.T) {
func TestMemoryAllocationsNoCrashOnInvalidFree(t *testing.T) {
sysCtx, err := getSystemContext(testutil.GetBasicNvmlMock(), kernel.ProcFSRoot())
require.NoError(t, err)
stream := newStreamHandler(0, sysCtx)
stream := newStreamHandler(0, "", sysCtx)

memAllocTime := uint64(1)
memFreeTime := uint64(2)
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestMemoryAllocationsNoCrashOnInvalidFree(t *testing.T) {
func TestMemoryAllocationsMultipleAllocsHandled(t *testing.T) {
sysCtx, err := getSystemContext(testutil.GetBasicNvmlMock(), kernel.ProcFSRoot())
require.NoError(t, err)
stream := newStreamHandler(0, sysCtx)
stream := newStreamHandler(0, "", sysCtx)

memAllocTime1, memAllocTime2 := uint64(1), uint64(10)
memFreeTime1, memFreeTime2 := uint64(15), uint64(20)
Expand Down Expand Up @@ -361,7 +361,7 @@ func TestKernelLaunchesIncludeEnrichedKernelData(t *testing.T) {

sysCtx.deviceSmVersions = map[int]int{0: int(smVersion)}

stream := newStreamHandler(uint32(pid), sysCtx)
stream := newStreamHandler(uint32(pid), "", sysCtx)

kernStartTime := uint64(1)
launch := &gpuebpf.CudaKernelLaunch{
Expand Down
2 changes: 2 additions & 0 deletions pkg/gpu/testdata/cudasample.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ int main(int argc, char **argv) {

fprintf(stderr, "Starting calls.\n");

fprintf(stderr, "Starting!\n");

cudaLaunchKernel((void *)0x1234, (dim3){ 1, 2, 3 }, (dim3){ 4, 5, 6 }, NULL, 10, stream);
void *ptr;
cudaMalloc(&ptr, 100);
Expand Down
8 changes: 7 additions & 1 deletion pkg/gpu/testutil/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"os"
"os/exec"
"sync"

"github.com/DataDog/datadog-agent/pkg/util/log"
)

// mutex to protect the build process
Expand All @@ -29,10 +31,14 @@ func buildCBinary(srcDir, outPath string) (string, error) {
// If there is a compiled binary already, skip the compilation.
// Meant for the CI.
if _, err := os.Stat(cachedServerBinaryPath); err == nil {
log.Debugf("Using cached test binary: %s", cachedServerBinaryPath)
return cachedServerBinaryPath, nil
}

c := exec.Command("clang", serverSrcDir, "-o", cachedServerBinaryPath)
// Build statically to avoid issues with shared libraries (specially libc if we run in alpine)
buildCmd := []string{"clang", "-static", serverSrcDir, "-o", cachedServerBinaryPath}
log.Debugf("Building test binary: %s", buildCmd)
c := exec.Command(buildCmd[0], buildCmd[1:]...)
out, err := c.CombinedOutput()
if err != nil {
return "", fmt.Errorf("could not build test binary: %s\noutput: %s", err, string(out))
Expand Down
29 changes: 29 additions & 0 deletions pkg/gpu/testutil/docker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build test

// Package testutil holds different utilities and stubs for testing
package testutil

import (
"bytes"
"fmt"
"os/exec"
"strings"
)

// GetDockerContainerID returns the ID of a docker container.
func GetDockerContainerID(dockerName string) (string, error) {
// Ensuring no previous instances exists.
c := exec.Command("docker", "inspect", "-f", "{{.Id}}", dockerName)
var stdout, stderr bytes.Buffer
c.Stdout = &stdout
c.Stderr = &stderr
if err := c.Run(); err != nil {
return "", fmt.Errorf("failed to get %s ID: %s", dockerName, stderr.String())
}
return strings.TrimSpace(stdout.String()), nil
}
Loading

0 comments on commit e07fb1b

Please sign in to comment.