diff --git a/pkg/collector/corechecks/servicediscovery/events.go b/pkg/collector/corechecks/servicediscovery/events.go index 996268a0067a1..7a1bd49dcc04d 100644 --- a/pkg/collector/corechecks/servicediscovery/events.go +++ b/pkg/collector/corechecks/servicediscovery/events.go @@ -39,6 +39,7 @@ type eventPayload struct { Ports []uint16 `json:"ports"` PID int `json:"pid"` CommandLine []string `json:"command_line"` + RSSMemory uint64 `json:"rss_memory"` } type event struct { @@ -73,6 +74,7 @@ func (ts *telemetrySender) newEvent(t eventType, svc serviceInfo) *event { Ports: svc.process.Ports, PID: svc.process.PID, CommandLine: svc.process.CmdLine, + RSSMemory: svc.process.Stat.RSS, }, } } diff --git a/pkg/collector/corechecks/servicediscovery/events_test.go b/pkg/collector/corechecks/servicediscovery/events_test.go index ec04b6318be2a..c3803fb1ce9ad 100644 --- a/pkg/collector/corechecks/servicediscovery/events_test.go +++ b/pkg/collector/corechecks/servicediscovery/events_test.go @@ -60,6 +60,7 @@ func Test_telemetrySender(t *testing.T) { Env: nil, Stat: procStat{ StartTime: uint64(now.Add(-20 * time.Minute).Unix()), + RSS: 500 * 1024 * 1024, }, Ports: []uint16{80, 8080}, }, @@ -95,6 +96,7 @@ func Test_telemetrySender(t *testing.T) { Ports: []uint16{80, 8080}, PID: 99, CommandLine: []string{"test-service", "--args"}, + RSSMemory: 500 * 1024 * 1024, }, }, { @@ -114,6 +116,7 @@ func Test_telemetrySender(t *testing.T) { Ports: []uint16{80, 8080}, PID: 99, CommandLine: []string{"test-service", "--args"}, + RSSMemory: 500 * 1024 * 1024, }, }, { @@ -133,6 +136,7 @@ func Test_telemetrySender(t *testing.T) { Ports: []uint16{80, 8080}, PID: 99, CommandLine: []string{"test-service", "--args"}, + RSSMemory: 500 * 1024 * 1024, }, }, } diff --git a/pkg/collector/corechecks/servicediscovery/impl_linux.go b/pkg/collector/corechecks/servicediscovery/impl_linux.go index 3b5ac8452d451..dc8b62e55d8bf 100644 --- a/pkg/collector/corechecks/servicediscovery/impl_linux.go +++ b/pkg/collector/corechecks/servicediscovery/impl_linux.go @@ -108,6 +108,12 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) { } } + // The endpoint could be refactored in the future to return a map to avoid this. + serviceMap := make(map[int]*model.Service, len(response.Services)) + for _, service := range response.Services { + serviceMap[service.PID] = &service + } + events := serviceEvents{} now := li.time.Now() @@ -115,8 +121,9 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) { // potentialServices contains processes that we scanned in the previous iteration and had open ports. // we check if they are still alive in this iteration, and if so, we send a start-service telemetry event. for pid, svc := range li.potentialServices { - if _, ok := procs[pid]; ok { + if service, ok := serviceMap[pid]; ok { svc.LastHeartbeat = now + svc.process.Stat.RSS = service.RSS li.aliveServices[pid] = svc events.start = append(events.start, *svc) } @@ -161,18 +168,19 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) { // check if services previously marked as alive still are. for pid, svc := range li.aliveServices { - if _, ok := procs[pid]; !ok { + if service, ok := serviceMap[pid]; !ok { delete(li.aliveServices, pid) events.stop = append(events.stop, *svc) } else if now.Sub(svc.LastHeartbeat).Truncate(time.Minute) >= heartbeatTime { svc.LastHeartbeat = now + svc.process.Stat.RSS = service.RSS events.heartbeat = append(events.heartbeat, *svc) } } // check if services previously marked as ignore are still alive. for pid := range li.ignoreProcs { - if _, ok := procs[pid]; !ok { + if _, ok := serviceMap[pid]; !ok { delete(li.ignoreProcs, pid) } } diff --git a/pkg/collector/corechecks/servicediscovery/impl_linux_test.go b/pkg/collector/corechecks/servicediscovery/impl_linux_test.go index d51004b64277b..edb002982f318 100644 --- a/pkg/collector/corechecks/servicediscovery/impl_linux_test.go +++ b/pkg/collector/corechecks/servicediscovery/impl_linux_test.go @@ -125,6 +125,15 @@ var ( Ports: []uint16{8080}, APMInstrumentation: string(apm.None), NameSource: "provided", + RSS: 100 * 1024 * 1024, + } + portTCP8080UpdatedRSS = model.Service{ + PID: procTestService1.pid, + Name: "test-service-1", + Ports: []uint16{8080}, + APMInstrumentation: string(apm.None), + NameSource: "provided", + RSS: 200 * 1024 * 1024, } portTCP8080DifferentPID = model.Service{ PID: procTestService1DifferentPID.pid, @@ -254,7 +263,7 @@ func Test_linuxImpl(t *testing.T) { servicesResp: &model.ServicesResponse{Services: []model.Service{ portTCP22, portTCP5000, - portTCP8080, + portTCP8080UpdatedRSS, portTCP8081, }}, time: calcTime(20 * time.Minute), @@ -288,6 +297,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 100 * 1024 * 1024, }, }, { @@ -306,6 +316,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 200 * 1024 * 1024, }, }, { @@ -324,6 +335,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 200 * 1024 * 1024, }, }, { @@ -455,6 +467,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 100 * 1024 * 1024, }, }, { @@ -505,6 +518,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 100 * 1024 * 1024, }, }, }, @@ -580,6 +594,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 100 * 1024 * 1024, }, }, { diff --git a/pkg/collector/corechecks/servicediscovery/model/model.go b/pkg/collector/corechecks/servicediscovery/model/model.go index 2c27e9e1379c9..3b0eadddc0deb 100644 --- a/pkg/collector/corechecks/servicediscovery/model/model.go +++ b/pkg/collector/corechecks/servicediscovery/model/model.go @@ -14,6 +14,7 @@ type Service struct { Ports []uint16 `json:"ports"` APMInstrumentation string `json:"apm_instrumentation"` Language string `json:"language"` + RSS uint64 `json:"rss"` } // ServicesResponse is the response for the system-probe /discovery/services endpoint. diff --git a/pkg/collector/corechecks/servicediscovery/module/impl_linux.go b/pkg/collector/corechecks/servicediscovery/module/impl_linux.go index 3b5602be0b4f2..36459c311495f 100644 --- a/pkg/collector/corechecks/servicediscovery/module/impl_linux.go +++ b/pkg/collector/corechecks/servicediscovery/module/impl_linux.go @@ -289,6 +289,11 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service return nil } + rss, err := getRSS(proc) + if err != nil { + return nil + } + var info *serviceInfo if cached, ok := s.cache[pid]; ok { info = cached @@ -313,6 +318,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service Ports: ports, APMInstrumentation: string(info.apmInstrumentation), Language: string(info.language), + RSS: rss, } } diff --git a/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go b/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go index 6806e878040ca..f9856d82e89f4 100644 --- a/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go +++ b/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go @@ -227,6 +227,7 @@ func TestBasic(t *testing.T) { serviceMap := getServicesMap(t, url) for _, pid := range expectedPIDs { require.Contains(t, serviceMap[pid].Ports, uint16(expectedPorts[pid])) + assertRSS(t, serviceMap[pid]) } } @@ -473,11 +474,28 @@ func TestAPMInstrumentationProvided(t *testing.T) { assert.Contains(collect, portMap, pid) assert.Equal(collect, string(test.language), portMap[pid].Language) assert.Equal(collect, string(apm.Provided), portMap[pid].APMInstrumentation) + assertRSS(t, portMap[pid]) }, 30*time.Second, 100*time.Millisecond) }) } } +func assertRSS(t assert.TestingT, svc model.Service) { + proc, err := process.NewProcess(int32(svc.PID)) + if !assert.NoError(t, err) { + return + } + + meminfo, err := proc.MemoryInfo() + if !assert.NoError(t, err) { + return + } + + // Allow a 20% variation to avoid potential flakiness due to difference in + // time of sampling the RSS. + assert.InEpsilon(t, meminfo.RSS, svc.RSS, 0.20) +} + func TestNodeDocker(t *testing.T) { cert, key, err := testutil.GetCertsPaths() require.NoError(t, err) @@ -494,6 +512,7 @@ func TestNodeDocker(t *testing.T) { assert.Contains(collect, svcMap, pid) assert.Equal(collect, "nodejs-https-server", svcMap[pid].Name) assert.Equal(collect, "provided", svcMap[pid].APMInstrumentation) + assertRSS(collect, svcMap[pid]) }, 30*time.Second, 100*time.Millisecond) } @@ -529,6 +548,7 @@ func TestAPMInstrumentationProvidedPython(t *testing.T) { assert.Contains(collect, portMap, pid) assert.Equal(collect, string(language.Python), portMap[pid].Language) assert.Equal(collect, string(apm.Provided), portMap[pid].APMInstrumentation) + assertRSS(collect, portMap[pid]) }, 30*time.Second, 100*time.Millisecond) } diff --git a/pkg/collector/corechecks/servicediscovery/module/stat.go b/pkg/collector/corechecks/servicediscovery/module/stat.go new file mode 100644 index 0000000000000..4e12e840d741c --- /dev/null +++ b/pkg/collector/corechecks/servicediscovery/module/stat.go @@ -0,0 +1,49 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux + +package module + +import ( + "errors" + "os" + "strconv" + "strings" + + "github.com/shirou/gopsutil/v3/process" + + "github.com/DataDog/datadog-agent/pkg/util/kernel" +) + +// pageSize stores the page size of the system in bytes, since the values in +// statm are in pages. +var pageSize = uint64(os.Getpagesize()) + +// getRSS returns the RSS for the process, in bytes. Compare MemoryInfo() in +// gopsutil which does the same thing but which parses several other fields +// which we're not interested in. +func getRSS(proc *process.Process) (uint64, error) { + statmPath := kernel.HostProc(strconv.Itoa(int(proc.Pid)), "statm") + + // This file is very small so just read it fully. + contents, err := os.ReadFile(statmPath) + if err != nil { + return 0, err + } + + // See proc(5) for a description of the format of statm and the fields. + fields := strings.Split(string(contents), " ") + if len(fields) < 6 { + return 0, errors.New("invalid statm") + } + + rssPages, err := strconv.ParseUint(fields[1], 10, 64) + if err != nil { + return 0, err + } + + return rssPages * pageSize, nil +} diff --git a/pkg/collector/corechecks/servicediscovery/servicediscovery.go b/pkg/collector/corechecks/servicediscovery/servicediscovery.go index 9578aa783ea47..ac63fe1a65f83 100644 --- a/pkg/collector/corechecks/servicediscovery/servicediscovery.go +++ b/pkg/collector/corechecks/servicediscovery/servicediscovery.go @@ -41,6 +41,7 @@ type serviceInfo struct { type procStat struct { StartTime uint64 + RSS uint64 } type processInfo struct { diff --git a/test/fakeintake/aggregator/servicediscoveryAggregator.go b/test/fakeintake/aggregator/servicediscoveryAggregator.go index b834f08fc5156..3ee0a522e01ec 100644 --- a/test/fakeintake/aggregator/servicediscoveryAggregator.go +++ b/test/fakeintake/aggregator/servicediscoveryAggregator.go @@ -30,6 +30,7 @@ type ServiceDiscoveryPayload struct { LastSeen int64 `json:"last_seen"` APMInstrumentation string `json:"apm_instrumentation"` ServiceNameSource string `json:"service_name_source"` + RSSMemory uint64 `json:"rss_memory"` } `json:"payload"` } diff --git a/test/new-e2e/tests/discovery/linux_test.go b/test/new-e2e/tests/discovery/linux_test.go index b9fa1d0023a63..2ede187079619 100644 --- a/test/new-e2e/tests/discovery/linux_test.go +++ b/test/new-e2e/tests/discovery/linux_test.go @@ -14,12 +14,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/DataDog/test-infra-definitions/components/datadog/agentparams" + "github.com/DataDog/datadog-agent/test/fakeintake/aggregator" "github.com/DataDog/datadog-agent/test/new-e2e/pkg/components" "github.com/DataDog/datadog-agent/test/new-e2e/pkg/e2e" "github.com/DataDog/datadog-agent/test/new-e2e/pkg/environments" awshost "github.com/DataDog/datadog-agent/test/new-e2e/pkg/environments/aws/host" - "github.com/DataDog/test-infra-definitions/components/datadog/agentparams" ) //go:embed testdata/config/agent_config.yaml @@ -92,24 +93,28 @@ func (s *linuxTestSuite) TestServiceDiscoveryCheck() { if assert.NotNil(c, found) { assert.Equal(c, "none", found.Payload.APMInstrumentation) assert.Equal(c, "generated", found.Payload.ServiceNameSource) + assert.NotZero(c, found.Payload.RSSMemory) } found = foundMap["node-instrumented"] if assert.NotNil(c, found) { assert.Equal(c, "provided", found.Payload.APMInstrumentation) assert.Equal(c, "generated", found.Payload.ServiceNameSource) + assert.NotZero(c, found.Payload.RSSMemory) } found = foundMap["python.server"] if assert.NotNil(c, found) { assert.Equal(c, "none", found.Payload.APMInstrumentation) assert.Equal(c, "generated", found.Payload.ServiceNameSource) + assert.NotZero(c, found.Payload.RSSMemory) } found = foundMap["python.instrumented"] if assert.NotNil(c, found) { assert.Equal(c, "provided", found.Payload.APMInstrumentation) assert.Equal(c, "generated", found.Payload.ServiceNameSource) + assert.NotZero(c, found.Payload.RSSMemory) } assert.Contains(c, foundMap, "json-server")