From c93a616f8b86b8d86a17fec1c24956e2b9c294e4 Mon Sep 17 00:00:00 2001 From: Vincent Whitchurch Date: Fri, 30 Aug 2024 10:51:30 +0200 Subject: [PATCH 1/3] discovery: Report RSS from system-probe --- .../servicediscovery/model/model.go | 1 + .../servicediscovery/module/impl_linux.go | 6 +++ .../module/impl_linux_test.go | 20 ++++++++ .../servicediscovery/module/stat.go | 49 +++++++++++++++++++ 4 files changed, 76 insertions(+) create mode 100644 pkg/collector/corechecks/servicediscovery/module/stat.go diff --git a/pkg/collector/corechecks/servicediscovery/model/model.go b/pkg/collector/corechecks/servicediscovery/model/model.go index 2c27e9e1379c9..3b0eadddc0deb 100644 --- a/pkg/collector/corechecks/servicediscovery/model/model.go +++ b/pkg/collector/corechecks/servicediscovery/model/model.go @@ -14,6 +14,7 @@ type Service struct { Ports []uint16 `json:"ports"` APMInstrumentation string `json:"apm_instrumentation"` Language string `json:"language"` + RSS uint64 `json:"rss"` } // ServicesResponse is the response for the system-probe /discovery/services endpoint. diff --git a/pkg/collector/corechecks/servicediscovery/module/impl_linux.go b/pkg/collector/corechecks/servicediscovery/module/impl_linux.go index 3b5602be0b4f2..36459c311495f 100644 --- a/pkg/collector/corechecks/servicediscovery/module/impl_linux.go +++ b/pkg/collector/corechecks/servicediscovery/module/impl_linux.go @@ -289,6 +289,11 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service return nil } + rss, err := getRSS(proc) + if err != nil { + return nil + } + var info *serviceInfo if cached, ok := s.cache[pid]; ok { info = cached @@ -313,6 +318,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service Ports: ports, APMInstrumentation: string(info.apmInstrumentation), Language: string(info.language), + RSS: rss, } } diff --git a/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go b/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go index 6806e878040ca..f9856d82e89f4 100644 --- a/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go +++ b/pkg/collector/corechecks/servicediscovery/module/impl_linux_test.go @@ -227,6 +227,7 @@ func TestBasic(t *testing.T) { serviceMap := getServicesMap(t, url) for _, pid := range expectedPIDs { require.Contains(t, serviceMap[pid].Ports, uint16(expectedPorts[pid])) + assertRSS(t, serviceMap[pid]) } } @@ -473,11 +474,28 @@ func TestAPMInstrumentationProvided(t *testing.T) { assert.Contains(collect, portMap, pid) assert.Equal(collect, string(test.language), portMap[pid].Language) assert.Equal(collect, string(apm.Provided), portMap[pid].APMInstrumentation) + assertRSS(t, portMap[pid]) }, 30*time.Second, 100*time.Millisecond) }) } } +func assertRSS(t assert.TestingT, svc model.Service) { + proc, err := process.NewProcess(int32(svc.PID)) + if !assert.NoError(t, err) { + return + } + + meminfo, err := proc.MemoryInfo() + if !assert.NoError(t, err) { + return + } + + // Allow a 20% variation to avoid potential flakiness due to difference in + // time of sampling the RSS. + assert.InEpsilon(t, meminfo.RSS, svc.RSS, 0.20) +} + func TestNodeDocker(t *testing.T) { cert, key, err := testutil.GetCertsPaths() require.NoError(t, err) @@ -494,6 +512,7 @@ func TestNodeDocker(t *testing.T) { assert.Contains(collect, svcMap, pid) assert.Equal(collect, "nodejs-https-server", svcMap[pid].Name) assert.Equal(collect, "provided", svcMap[pid].APMInstrumentation) + assertRSS(collect, svcMap[pid]) }, 30*time.Second, 100*time.Millisecond) } @@ -529,6 +548,7 @@ func TestAPMInstrumentationProvidedPython(t *testing.T) { assert.Contains(collect, portMap, pid) assert.Equal(collect, string(language.Python), portMap[pid].Language) assert.Equal(collect, string(apm.Provided), portMap[pid].APMInstrumentation) + assertRSS(collect, portMap[pid]) }, 30*time.Second, 100*time.Millisecond) } diff --git a/pkg/collector/corechecks/servicediscovery/module/stat.go b/pkg/collector/corechecks/servicediscovery/module/stat.go new file mode 100644 index 0000000000000..4e12e840d741c --- /dev/null +++ b/pkg/collector/corechecks/servicediscovery/module/stat.go @@ -0,0 +1,49 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux + +package module + +import ( + "errors" + "os" + "strconv" + "strings" + + "github.com/shirou/gopsutil/v3/process" + + "github.com/DataDog/datadog-agent/pkg/util/kernel" +) + +// pageSize stores the page size of the system in bytes, since the values in +// statm are in pages. +var pageSize = uint64(os.Getpagesize()) + +// getRSS returns the RSS for the process, in bytes. Compare MemoryInfo() in +// gopsutil which does the same thing but which parses several other fields +// which we're not interested in. +func getRSS(proc *process.Process) (uint64, error) { + statmPath := kernel.HostProc(strconv.Itoa(int(proc.Pid)), "statm") + + // This file is very small so just read it fully. + contents, err := os.ReadFile(statmPath) + if err != nil { + return 0, err + } + + // See proc(5) for a description of the format of statm and the fields. + fields := strings.Split(string(contents), " ") + if len(fields) < 6 { + return 0, errors.New("invalid statm") + } + + rssPages, err := strconv.ParseUint(fields[1], 10, 64) + if err != nil { + return 0, err + } + + return rssPages * pageSize, nil +} From 96a6761a52c3186cc054ec097481691ebe1f1d4c Mon Sep 17 00:00:00 2001 From: Vincent Whitchurch Date: Fri, 30 Aug 2024 13:55:54 +0200 Subject: [PATCH 2/3] discovery: Use service list for aliveness check Currently, the agent check uses the all process list to check if the process is alive or not, but we can use the service list instead, since that also considers open ports. So if a process closes its listening port the service is no longer alive. This also allows to use the extra fields received from system-probe such as RSS and allows the proc usage in the check to be eliminated in the future. --- .../corechecks/servicediscovery/impl_linux.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/collector/corechecks/servicediscovery/impl_linux.go b/pkg/collector/corechecks/servicediscovery/impl_linux.go index 3b5ac8452d451..c66a2f969339e 100644 --- a/pkg/collector/corechecks/servicediscovery/impl_linux.go +++ b/pkg/collector/corechecks/servicediscovery/impl_linux.go @@ -108,6 +108,12 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) { } } + // The endpoint could be refactored in the future to return a map to avoid this. + serviceMap := make(map[int]*model.Service, len(response.Services)) + for _, service := range response.Services { + serviceMap[service.PID] = &service + } + events := serviceEvents{} now := li.time.Now() @@ -115,7 +121,7 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) { // potentialServices contains processes that we scanned in the previous iteration and had open ports. // we check if they are still alive in this iteration, and if so, we send a start-service telemetry event. for pid, svc := range li.potentialServices { - if _, ok := procs[pid]; ok { + if _, ok := serviceMap[pid]; ok { svc.LastHeartbeat = now li.aliveServices[pid] = svc events.start = append(events.start, *svc) @@ -161,7 +167,7 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) { // check if services previously marked as alive still are. for pid, svc := range li.aliveServices { - if _, ok := procs[pid]; !ok { + if _, ok := serviceMap[pid]; !ok { delete(li.aliveServices, pid) events.stop = append(events.stop, *svc) } else if now.Sub(svc.LastHeartbeat).Truncate(time.Minute) >= heartbeatTime { @@ -172,7 +178,7 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) { // check if services previously marked as ignore are still alive. for pid := range li.ignoreProcs { - if _, ok := procs[pid]; !ok { + if _, ok := serviceMap[pid]; !ok { delete(li.ignoreProcs, pid) } } From 9626bea630f384d01b52037be2a8dfbcab7b753a Mon Sep 17 00:00:00 2001 From: Vincent Whitchurch Date: Fri, 30 Aug 2024 13:56:24 +0200 Subject: [PATCH 3/3] discovery: Report RSS in event Pass along the information about services' RSS memory received from system-probe to the final telemetry events. --- .../corechecks/servicediscovery/events.go | 2 ++ .../corechecks/servicediscovery/events_test.go | 4 ++++ .../corechecks/servicediscovery/impl_linux.go | 6 ++++-- .../servicediscovery/impl_linux_test.go | 17 ++++++++++++++++- .../servicediscovery/servicediscovery.go | 1 + .../aggregator/servicediscoveryAggregator.go | 1 + test/new-e2e/tests/discovery/linux_test.go | 7 ++++++- 7 files changed, 34 insertions(+), 4 deletions(-) diff --git a/pkg/collector/corechecks/servicediscovery/events.go b/pkg/collector/corechecks/servicediscovery/events.go index 996268a0067a1..7a1bd49dcc04d 100644 --- a/pkg/collector/corechecks/servicediscovery/events.go +++ b/pkg/collector/corechecks/servicediscovery/events.go @@ -39,6 +39,7 @@ type eventPayload struct { Ports []uint16 `json:"ports"` PID int `json:"pid"` CommandLine []string `json:"command_line"` + RSSMemory uint64 `json:"rss_memory"` } type event struct { @@ -73,6 +74,7 @@ func (ts *telemetrySender) newEvent(t eventType, svc serviceInfo) *event { Ports: svc.process.Ports, PID: svc.process.PID, CommandLine: svc.process.CmdLine, + RSSMemory: svc.process.Stat.RSS, }, } } diff --git a/pkg/collector/corechecks/servicediscovery/events_test.go b/pkg/collector/corechecks/servicediscovery/events_test.go index ec04b6318be2a..c3803fb1ce9ad 100644 --- a/pkg/collector/corechecks/servicediscovery/events_test.go +++ b/pkg/collector/corechecks/servicediscovery/events_test.go @@ -60,6 +60,7 @@ func Test_telemetrySender(t *testing.T) { Env: nil, Stat: procStat{ StartTime: uint64(now.Add(-20 * time.Minute).Unix()), + RSS: 500 * 1024 * 1024, }, Ports: []uint16{80, 8080}, }, @@ -95,6 +96,7 @@ func Test_telemetrySender(t *testing.T) { Ports: []uint16{80, 8080}, PID: 99, CommandLine: []string{"test-service", "--args"}, + RSSMemory: 500 * 1024 * 1024, }, }, { @@ -114,6 +116,7 @@ func Test_telemetrySender(t *testing.T) { Ports: []uint16{80, 8080}, PID: 99, CommandLine: []string{"test-service", "--args"}, + RSSMemory: 500 * 1024 * 1024, }, }, { @@ -133,6 +136,7 @@ func Test_telemetrySender(t *testing.T) { Ports: []uint16{80, 8080}, PID: 99, CommandLine: []string{"test-service", "--args"}, + RSSMemory: 500 * 1024 * 1024, }, }, } diff --git a/pkg/collector/corechecks/servicediscovery/impl_linux.go b/pkg/collector/corechecks/servicediscovery/impl_linux.go index c66a2f969339e..dc8b62e55d8bf 100644 --- a/pkg/collector/corechecks/servicediscovery/impl_linux.go +++ b/pkg/collector/corechecks/servicediscovery/impl_linux.go @@ -121,8 +121,9 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) { // potentialServices contains processes that we scanned in the previous iteration and had open ports. // we check if they are still alive in this iteration, and if so, we send a start-service telemetry event. for pid, svc := range li.potentialServices { - if _, ok := serviceMap[pid]; ok { + if service, ok := serviceMap[pid]; ok { svc.LastHeartbeat = now + svc.process.Stat.RSS = service.RSS li.aliveServices[pid] = svc events.start = append(events.start, *svc) } @@ -167,11 +168,12 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) { // check if services previously marked as alive still are. for pid, svc := range li.aliveServices { - if _, ok := serviceMap[pid]; !ok { + if service, ok := serviceMap[pid]; !ok { delete(li.aliveServices, pid) events.stop = append(events.stop, *svc) } else if now.Sub(svc.LastHeartbeat).Truncate(time.Minute) >= heartbeatTime { svc.LastHeartbeat = now + svc.process.Stat.RSS = service.RSS events.heartbeat = append(events.heartbeat, *svc) } } diff --git a/pkg/collector/corechecks/servicediscovery/impl_linux_test.go b/pkg/collector/corechecks/servicediscovery/impl_linux_test.go index d51004b64277b..edb002982f318 100644 --- a/pkg/collector/corechecks/servicediscovery/impl_linux_test.go +++ b/pkg/collector/corechecks/servicediscovery/impl_linux_test.go @@ -125,6 +125,15 @@ var ( Ports: []uint16{8080}, APMInstrumentation: string(apm.None), NameSource: "provided", + RSS: 100 * 1024 * 1024, + } + portTCP8080UpdatedRSS = model.Service{ + PID: procTestService1.pid, + Name: "test-service-1", + Ports: []uint16{8080}, + APMInstrumentation: string(apm.None), + NameSource: "provided", + RSS: 200 * 1024 * 1024, } portTCP8080DifferentPID = model.Service{ PID: procTestService1DifferentPID.pid, @@ -254,7 +263,7 @@ func Test_linuxImpl(t *testing.T) { servicesResp: &model.ServicesResponse{Services: []model.Service{ portTCP22, portTCP5000, - portTCP8080, + portTCP8080UpdatedRSS, portTCP8081, }}, time: calcTime(20 * time.Minute), @@ -288,6 +297,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 100 * 1024 * 1024, }, }, { @@ -306,6 +316,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 200 * 1024 * 1024, }, }, { @@ -324,6 +335,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 200 * 1024 * 1024, }, }, { @@ -455,6 +467,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 100 * 1024 * 1024, }, }, { @@ -505,6 +518,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 100 * 1024 * 1024, }, }, }, @@ -580,6 +594,7 @@ func Test_linuxImpl(t *testing.T) { CommandLine: []string{"test-service-1"}, APMInstrumentation: "none", ServiceNameSource: "provided", + RSSMemory: 100 * 1024 * 1024, }, }, { diff --git a/pkg/collector/corechecks/servicediscovery/servicediscovery.go b/pkg/collector/corechecks/servicediscovery/servicediscovery.go index 9578aa783ea47..ac63fe1a65f83 100644 --- a/pkg/collector/corechecks/servicediscovery/servicediscovery.go +++ b/pkg/collector/corechecks/servicediscovery/servicediscovery.go @@ -41,6 +41,7 @@ type serviceInfo struct { type procStat struct { StartTime uint64 + RSS uint64 } type processInfo struct { diff --git a/test/fakeintake/aggregator/servicediscoveryAggregator.go b/test/fakeintake/aggregator/servicediscoveryAggregator.go index b834f08fc5156..3ee0a522e01ec 100644 --- a/test/fakeintake/aggregator/servicediscoveryAggregator.go +++ b/test/fakeintake/aggregator/servicediscoveryAggregator.go @@ -30,6 +30,7 @@ type ServiceDiscoveryPayload struct { LastSeen int64 `json:"last_seen"` APMInstrumentation string `json:"apm_instrumentation"` ServiceNameSource string `json:"service_name_source"` + RSSMemory uint64 `json:"rss_memory"` } `json:"payload"` } diff --git a/test/new-e2e/tests/discovery/linux_test.go b/test/new-e2e/tests/discovery/linux_test.go index b9fa1d0023a63..2ede187079619 100644 --- a/test/new-e2e/tests/discovery/linux_test.go +++ b/test/new-e2e/tests/discovery/linux_test.go @@ -14,12 +14,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/DataDog/test-infra-definitions/components/datadog/agentparams" + "github.com/DataDog/datadog-agent/test/fakeintake/aggregator" "github.com/DataDog/datadog-agent/test/new-e2e/pkg/components" "github.com/DataDog/datadog-agent/test/new-e2e/pkg/e2e" "github.com/DataDog/datadog-agent/test/new-e2e/pkg/environments" awshost "github.com/DataDog/datadog-agent/test/new-e2e/pkg/environments/aws/host" - "github.com/DataDog/test-infra-definitions/components/datadog/agentparams" ) //go:embed testdata/config/agent_config.yaml @@ -92,24 +93,28 @@ func (s *linuxTestSuite) TestServiceDiscoveryCheck() { if assert.NotNil(c, found) { assert.Equal(c, "none", found.Payload.APMInstrumentation) assert.Equal(c, "generated", found.Payload.ServiceNameSource) + assert.NotZero(c, found.Payload.RSSMemory) } found = foundMap["node-instrumented"] if assert.NotNil(c, found) { assert.Equal(c, "provided", found.Payload.APMInstrumentation) assert.Equal(c, "generated", found.Payload.ServiceNameSource) + assert.NotZero(c, found.Payload.RSSMemory) } found = foundMap["python.server"] if assert.NotNil(c, found) { assert.Equal(c, "none", found.Payload.APMInstrumentation) assert.Equal(c, "generated", found.Payload.ServiceNameSource) + assert.NotZero(c, found.Payload.RSSMemory) } found = foundMap["python.instrumented"] if assert.NotNil(c, found) { assert.Equal(c, "provided", found.Payload.APMInstrumentation) assert.Equal(c, "generated", found.Payload.ServiceNameSource) + assert.NotZero(c, found.Payload.RSSMemory) } assert.Contains(c, foundMap, "json-server")