Skip to content

Commit

Permalink
Add container tagging
Browse files Browse the repository at this point in the history
  • Loading branch information
gjulianm committed Oct 9, 2024
1 parent 223f558 commit 916bf78
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 2 deletions.
6 changes: 6 additions & 0 deletions pkg/collector/corechecks/gpu/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,15 @@ type StreamKey struct {
Stream uint64 `json:"stream"`
}

// StreamMetadata contains metadata for a stream, such as container ID
type StreamMetadata struct {
ContainerID string `json:"container_id"`
}

// StreamData contains kernel spans and allocations for a stream
type StreamData struct {
Key StreamKey `json:"key"`
Metadata StreamMetadata `json:"metadata"`
Spans []*KernelSpan `json:"spans"`
Allocations []*MemoryAllocation `json:"allocations"`
}
Expand Down
25 changes: 24 additions & 1 deletion pkg/gpu/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/gpu/cuda"
"github.com/DataDog/datadog-agent/pkg/network/events"
"github.com/DataDog/datadog-agent/pkg/network/tracer"
sectime "github.com/DataDog/datadog-agent/pkg/security/resolvers/time"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
)

Expand All @@ -24,6 +27,12 @@ type systemContext struct {

// pidMaps maps each process ID to its memory maps
pidMaps map[int]*kernel.ProcMapEntries

// timeResolver is used to convert from kernel time to system time
timeResolver *sectime.Resolver

// processCache is used to resolve process information
processCache *tracer.ProcessCache
}

// fileData holds the symbol table and Fatbin data for a given file.
Expand All @@ -38,14 +47,28 @@ func (fd *fileData) updateAccessTime() {
}

func getSystemContext() (*systemContext, error) {
var err error

ctx := &systemContext{
fileData: make(map[string]*fileData),
pidMaps: make(map[int]*kernel.ProcMapEntries),
}
if err := ctx.queryDevices(); err != nil {

if err = ctx.queryDevices(); err != nil {
return nil, fmt.Errorf("error querying devices: %w", err)
}

ctx.timeResolver, err = sectime.NewResolver()
if err != nil {
return nil, fmt.Errorf("cannot create time resolver: %s", err)
}

ctx.processCache, err = tracer.NewProcessCache(32768)
if err != nil {
return nil, fmt.Errorf("cannot create process cache: %s", err)
}
events.RegisterHandler(ctx.processCache)

return ctx, nil
}

Expand Down
35 changes: 34 additions & 1 deletion pkg/gpu/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package gpu
import (
"fmt"
"math"
"time"

"github.com/DataDog/datadog-agent/pkg/collector/corechecks/gpu/model"
ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf"
Expand Down Expand Up @@ -199,16 +200,45 @@ func getAssociatedAllocations(span *model.KernelSpan) []*model.MemoryAllocation
return allocations
}

func (sh *StreamHandler) getDataMaxTimestamp() uint64 {
maxTs := uint64(0)

for _, span := range sh.kernelSpans {
maxTs = max(maxTs, span.EndKtime)
}

for _, alloc := range sh.allocations {
maxTs = max(maxTs, alloc.EndKtime)
}

return maxTs
}

func (sh *StreamHandler) buildMetadata(dataTimestamp time.Time) model.StreamMetadata {
proc, found := sh.sysCtx.processCache.Get(sh.key.Pid, dataTimestamp.UnixNano())
var containerID string
if found {
containerID = proc.ContainerID.Get().(string)
}

return model.StreamMetadata{
ContainerID: containerID,
}
}

// getPastData returns all the events that have finished (kernel spans with synchronizations/allocations that have been freed)
// If flush is true, the data will be cleared from the handler
func (sh *StreamHandler) getPastData(flush bool) *model.StreamData {
if len(sh.kernelSpans) == 0 && len(sh.allocations) == 0 {
return nil
}

maxTs := sh.sysCtx.timeResolver.ResolveMonotonicTimestamp(sh.getDataMaxTimestamp())

data := &model.StreamData{
Spans: sh.kernelSpans,
Allocations: sh.allocations,
Metadata: sh.buildMetadata(maxTs),
}

if flush {
Expand All @@ -224,8 +254,11 @@ func (sh *StreamHandler) getCurrentData(now uint64) *model.StreamData {
return nil
}

nowTs := sh.sysCtx.timeResolver.ResolveMonotonicTimestamp(now)

data := &model.StreamData{
Spans: []*model.KernelSpan{sh.getCurrentKernelSpan(now)},
Spans: []*model.KernelSpan{sh.getCurrentKernelSpan(now)},
Metadata: sh.buildMetadata(nowTs),
}

for _, alloc := range sh.memAllocEvents {
Expand Down

0 comments on commit 916bf78

Please sign in to comment.