diff --git a/pkg/collector/corechecks/gpu/model/model.go b/pkg/collector/corechecks/gpu/model/model.go index d5184416bae18f..ec36d3b3d5934a 100644 --- a/pkg/collector/corechecks/gpu/model/model.go +++ b/pkg/collector/corechecks/gpu/model/model.go @@ -73,9 +73,15 @@ type StreamKey struct { Stream uint64 `json:"stream"` } +// StreamMetadata contains metadata for a stream, such as container ID +type StreamMetadata struct { + ContainerID string `json:"container_id"` +} + // StreamData contains kernel spans and allocations for a stream type StreamData struct { Key StreamKey `json:"key"` + Metadata StreamMetadata `json:"metadata"` Spans []*KernelSpan `json:"spans"` Allocations []*MemoryAllocation `json:"allocations"` } diff --git a/pkg/gpu/context.go b/pkg/gpu/context.go index f931048bfa8bc9..67af975c3eea3b 100644 --- a/pkg/gpu/context.go +++ b/pkg/gpu/context.go @@ -11,6 +11,9 @@ import ( "time" "github.com/DataDog/datadog-agent/pkg/gpu/cuda" + "github.com/DataDog/datadog-agent/pkg/network/events" + "github.com/DataDog/datadog-agent/pkg/network/tracer" + sectime "github.com/DataDog/datadog-agent/pkg/security/resolvers/time" "github.com/DataDog/datadog-agent/pkg/util/kernel" ) @@ -24,6 +27,12 @@ type systemContext struct { // pidMaps maps each process ID to its memory maps pidMaps map[int]*kernel.ProcMapEntries + + // timeResolver is used to convert from kernel time to system time + timeResolver *sectime.Resolver + + // processCache is used to resolve process information + processCache *tracer.ProcessCache } // fileData holds the symbol table and Fatbin data for a given file. @@ -38,14 +47,28 @@ func (fd *fileData) updateAccessTime() { } func getSystemContext() (*systemContext, error) { + var err error + ctx := &systemContext{ fileData: make(map[string]*fileData), pidMaps: make(map[int]*kernel.ProcMapEntries), } - if err := ctx.queryDevices(); err != nil { + + if err = ctx.queryDevices(); err != nil { return nil, fmt.Errorf("error querying devices: %w", err) } + ctx.timeResolver, err = sectime.NewResolver() + if err != nil { + return nil, fmt.Errorf("cannot create time resolver: %s", err) + } + + ctx.processCache, err = tracer.NewProcessCache(32768) + if err != nil { + return nil, fmt.Errorf("cannot create process cache: %s", err) + } + events.RegisterHandler(ctx.processCache) + return ctx, nil } diff --git a/pkg/gpu/stream.go b/pkg/gpu/stream.go index a4e47322ea3a81..55cb3a806b18ed 100644 --- a/pkg/gpu/stream.go +++ b/pkg/gpu/stream.go @@ -10,6 +10,7 @@ package gpu import ( "fmt" "math" + "time" "github.com/DataDog/datadog-agent/pkg/collector/corechecks/gpu/model" ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" @@ -199,6 +200,32 @@ func getAssociatedAllocations(span *model.KernelSpan) []*model.MemoryAllocation return allocations } +func (sh *StreamHandler) getDataMaxTimestamp() uint64 { + maxTs := uint64(0) + + for _, span := range sh.kernelSpans { + maxTs = max(maxTs, span.EndKtime) + } + + for _, alloc := range sh.allocations { + maxTs = max(maxTs, alloc.EndKtime) + } + + return maxTs +} + +func (sh *StreamHandler) buildMetadata(dataTimestamp time.Time) model.StreamMetadata { + proc, found := sh.sysCtx.processCache.Get(sh.key.Pid, dataTimestamp.UnixNano()) + var containerID string + if found { + containerID = proc.ContainerID.Get().(string) + } + + return model.StreamMetadata{ + ContainerID: containerID, + } +} + // getPastData returns all the events that have finished (kernel spans with synchronizations/allocations that have been freed) // If flush is true, the data will be cleared from the handler func (sh *StreamHandler) getPastData(flush bool) *model.StreamData { @@ -206,9 +233,12 @@ func (sh *StreamHandler) getPastData(flush bool) *model.StreamData { return nil } + maxTs := sh.sysCtx.timeResolver.ResolveMonotonicTimestamp(sh.getDataMaxTimestamp()) + data := &model.StreamData{ Spans: sh.kernelSpans, Allocations: sh.allocations, + Metadata: sh.buildMetadata(maxTs), } if flush { @@ -224,8 +254,11 @@ func (sh *StreamHandler) getCurrentData(now uint64) *model.StreamData { return nil } + nowTs := sh.sysCtx.timeResolver.ResolveMonotonicTimestamp(now) + data := &model.StreamData{ - Spans: []*model.KernelSpan{sh.getCurrentKernelSpan(now)}, + Spans: []*model.KernelSpan{sh.getCurrentKernelSpan(now)}, + Metadata: sh.buildMetadata(nowTs), } for _, alloc := range sh.memAllocEvents {