Skip to content

Commit

Permalink
discovery: Report RSS (#28931)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitkyrka authored Sep 2, 2024
1 parent 2e7d2bb commit 20d20f9
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 5 deletions.
2 changes: 2 additions & 0 deletions pkg/collector/corechecks/servicediscovery/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type eventPayload struct {
Ports []uint16 `json:"ports"`
PID int `json:"pid"`
CommandLine []string `json:"command_line"`
RSSMemory uint64 `json:"rss_memory"`
}

type event struct {
Expand Down Expand Up @@ -73,6 +74,7 @@ func (ts *telemetrySender) newEvent(t eventType, svc serviceInfo) *event {
Ports: svc.process.Ports,
PID: svc.process.PID,
CommandLine: svc.process.CmdLine,
RSSMemory: svc.process.Stat.RSS,
},
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/collector/corechecks/servicediscovery/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func Test_telemetrySender(t *testing.T) {
Env: nil,
Stat: procStat{
StartTime: uint64(now.Add(-20 * time.Minute).Unix()),
RSS: 500 * 1024 * 1024,
},
Ports: []uint16{80, 8080},
},
Expand Down Expand Up @@ -95,6 +96,7 @@ func Test_telemetrySender(t *testing.T) {
Ports: []uint16{80, 8080},
PID: 99,
CommandLine: []string{"test-service", "--args"},
RSSMemory: 500 * 1024 * 1024,
},
},
{
Expand All @@ -114,6 +116,7 @@ func Test_telemetrySender(t *testing.T) {
Ports: []uint16{80, 8080},
PID: 99,
CommandLine: []string{"test-service", "--args"},
RSSMemory: 500 * 1024 * 1024,
},
},
{
Expand All @@ -133,6 +136,7 @@ func Test_telemetrySender(t *testing.T) {
Ports: []uint16{80, 8080},
PID: 99,
CommandLine: []string{"test-service", "--args"},
RSSMemory: 500 * 1024 * 1024,
},
},
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/collector/corechecks/servicediscovery/impl_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,22 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) {
}
}

// The endpoint could be refactored in the future to return a map to avoid this.
serviceMap := make(map[int]*model.Service, len(response.Services))
for _, service := range response.Services {
serviceMap[service.PID] = &service
}

events := serviceEvents{}

now := li.time.Now()

// potentialServices contains processes that we scanned in the previous iteration and had open ports.
// we check if they are still alive in this iteration, and if so, we send a start-service telemetry event.
for pid, svc := range li.potentialServices {
if _, ok := procs[pid]; ok {
if service, ok := serviceMap[pid]; ok {
svc.LastHeartbeat = now
svc.process.Stat.RSS = service.RSS
li.aliveServices[pid] = svc
events.start = append(events.start, *svc)
}
Expand Down Expand Up @@ -161,18 +168,19 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) {

// check if services previously marked as alive still are.
for pid, svc := range li.aliveServices {
if _, ok := procs[pid]; !ok {
if service, ok := serviceMap[pid]; !ok {
delete(li.aliveServices, pid)
events.stop = append(events.stop, *svc)
} else if now.Sub(svc.LastHeartbeat).Truncate(time.Minute) >= heartbeatTime {
svc.LastHeartbeat = now
svc.process.Stat.RSS = service.RSS
events.heartbeat = append(events.heartbeat, *svc)
}
}

// check if services previously marked as ignore are still alive.
for pid := range li.ignoreProcs {
if _, ok := procs[pid]; !ok {
if _, ok := serviceMap[pid]; !ok {
delete(li.ignoreProcs, pid)
}
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/collector/corechecks/servicediscovery/impl_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ var (
Ports: []uint16{8080},
APMInstrumentation: string(apm.None),
NameSource: "provided",
RSS: 100 * 1024 * 1024,
}
portTCP8080UpdatedRSS = model.Service{
PID: procTestService1.pid,
Name: "test-service-1",
Ports: []uint16{8080},
APMInstrumentation: string(apm.None),
NameSource: "provided",
RSS: 200 * 1024 * 1024,
}
portTCP8080DifferentPID = model.Service{
PID: procTestService1DifferentPID.pid,
Expand Down Expand Up @@ -254,7 +263,7 @@ func Test_linuxImpl(t *testing.T) {
servicesResp: &model.ServicesResponse{Services: []model.Service{
portTCP22,
portTCP5000,
portTCP8080,
portTCP8080UpdatedRSS,
portTCP8081,
}},
time: calcTime(20 * time.Minute),
Expand Down Expand Up @@ -288,6 +297,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 100 * 1024 * 1024,
},
},
{
Expand All @@ -306,6 +316,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 200 * 1024 * 1024,
},
},
{
Expand All @@ -324,6 +335,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 200 * 1024 * 1024,
},
},
{
Expand Down Expand Up @@ -455,6 +467,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 100 * 1024 * 1024,
},
},
{
Expand Down Expand Up @@ -505,6 +518,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 100 * 1024 * 1024,
},
},
},
Expand Down Expand Up @@ -580,6 +594,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 100 * 1024 * 1024,
},
},
{
Expand Down
1 change: 1 addition & 0 deletions pkg/collector/corechecks/servicediscovery/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Service struct {
Ports []uint16 `json:"ports"`
APMInstrumentation string `json:"apm_instrumentation"`
Language string `json:"language"`
RSS uint64 `json:"rss"`
}

// ServicesResponse is the response for the system-probe /discovery/services endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service
return nil
}

rss, err := getRSS(proc)
if err != nil {
return nil
}

var info *serviceInfo
if cached, ok := s.cache[pid]; ok {
info = cached
Expand All @@ -313,6 +318,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service
Ports: ports,
APMInstrumentation: string(info.apmInstrumentation),
Language: string(info.language),
RSS: rss,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func TestBasic(t *testing.T) {
serviceMap := getServicesMap(t, url)
for _, pid := range expectedPIDs {
require.Contains(t, serviceMap[pid].Ports, uint16(expectedPorts[pid]))
assertRSS(t, serviceMap[pid])
}
}

Expand Down Expand Up @@ -473,11 +474,28 @@ func TestAPMInstrumentationProvided(t *testing.T) {
assert.Contains(collect, portMap, pid)
assert.Equal(collect, string(test.language), portMap[pid].Language)
assert.Equal(collect, string(apm.Provided), portMap[pid].APMInstrumentation)
assertRSS(t, portMap[pid])
}, 30*time.Second, 100*time.Millisecond)
})
}
}

func assertRSS(t assert.TestingT, svc model.Service) {
proc, err := process.NewProcess(int32(svc.PID))
if !assert.NoError(t, err) {
return
}

meminfo, err := proc.MemoryInfo()
if !assert.NoError(t, err) {
return
}

// Allow a 20% variation to avoid potential flakiness due to difference in
// time of sampling the RSS.
assert.InEpsilon(t, meminfo.RSS, svc.RSS, 0.20)
}

func TestNodeDocker(t *testing.T) {
cert, key, err := testutil.GetCertsPaths()
require.NoError(t, err)
Expand All @@ -494,6 +512,7 @@ func TestNodeDocker(t *testing.T) {
assert.Contains(collect, svcMap, pid)
assert.Equal(collect, "nodejs-https-server", svcMap[pid].Name)
assert.Equal(collect, "provided", svcMap[pid].APMInstrumentation)
assertRSS(collect, svcMap[pid])
}, 30*time.Second, 100*time.Millisecond)
}

Expand Down Expand Up @@ -529,6 +548,7 @@ func TestAPMInstrumentationProvidedPython(t *testing.T) {
assert.Contains(collect, portMap, pid)
assert.Equal(collect, string(language.Python), portMap[pid].Language)
assert.Equal(collect, string(apm.Provided), portMap[pid].APMInstrumentation)
assertRSS(collect, portMap[pid])
}, 30*time.Second, 100*time.Millisecond)
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/collector/corechecks/servicediscovery/module/stat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux

package module

import (
"errors"
"os"
"strconv"
"strings"

"github.com/shirou/gopsutil/v3/process"

"github.com/DataDog/datadog-agent/pkg/util/kernel"
)

// pageSize stores the page size of the system in bytes, since the values in
// statm are in pages.
var pageSize = uint64(os.Getpagesize())

// getRSS returns the RSS for the process, in bytes. Compare MemoryInfo() in
// gopsutil which does the same thing but which parses several other fields
// which we're not interested in.
func getRSS(proc *process.Process) (uint64, error) {
statmPath := kernel.HostProc(strconv.Itoa(int(proc.Pid)), "statm")

// This file is very small so just read it fully.
contents, err := os.ReadFile(statmPath)
if err != nil {
return 0, err
}

// See proc(5) for a description of the format of statm and the fields.
fields := strings.Split(string(contents), " ")
if len(fields) < 6 {
return 0, errors.New("invalid statm")
}

rssPages, err := strconv.ParseUint(fields[1], 10, 64)
if err != nil {
return 0, err
}

return rssPages * pageSize, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type serviceInfo struct {

type procStat struct {
StartTime uint64
RSS uint64
}

type processInfo struct {
Expand Down
1 change: 1 addition & 0 deletions test/fakeintake/aggregator/servicediscoveryAggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ServiceDiscoveryPayload struct {
LastSeen int64 `json:"last_seen"`
APMInstrumentation string `json:"apm_instrumentation"`
ServiceNameSource string `json:"service_name_source"`
RSSMemory uint64 `json:"rss_memory"`
} `json:"payload"`
}

Expand Down
7 changes: 6 additions & 1 deletion test/new-e2e/tests/discovery/linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/test-infra-definitions/components/datadog/agentparams"

"github.com/DataDog/datadog-agent/test/fakeintake/aggregator"
"github.com/DataDog/datadog-agent/test/new-e2e/pkg/components"
"github.com/DataDog/datadog-agent/test/new-e2e/pkg/e2e"
"github.com/DataDog/datadog-agent/test/new-e2e/pkg/environments"
awshost "github.com/DataDog/datadog-agent/test/new-e2e/pkg/environments/aws/host"
"github.com/DataDog/test-infra-definitions/components/datadog/agentparams"
)

//go:embed testdata/config/agent_config.yaml
Expand Down Expand Up @@ -92,24 +93,28 @@ func (s *linuxTestSuite) TestServiceDiscoveryCheck() {
if assert.NotNil(c, found) {
assert.Equal(c, "none", found.Payload.APMInstrumentation)
assert.Equal(c, "generated", found.Payload.ServiceNameSource)
assert.NotZero(c, found.Payload.RSSMemory)
}

found = foundMap["node-instrumented"]
if assert.NotNil(c, found) {
assert.Equal(c, "provided", found.Payload.APMInstrumentation)
assert.Equal(c, "generated", found.Payload.ServiceNameSource)
assert.NotZero(c, found.Payload.RSSMemory)
}

found = foundMap["python.server"]
if assert.NotNil(c, found) {
assert.Equal(c, "none", found.Payload.APMInstrumentation)
assert.Equal(c, "generated", found.Payload.ServiceNameSource)
assert.NotZero(c, found.Payload.RSSMemory)
}

found = foundMap["python.instrumented"]
if assert.NotNil(c, found) {
assert.Equal(c, "provided", found.Payload.APMInstrumentation)
assert.Equal(c, "generated", found.Payload.ServiceNameSource)
assert.NotZero(c, found.Payload.RSSMemory)
}

assert.Contains(c, foundMap, "json-server")
Expand Down

0 comments on commit 20d20f9

Please sign in to comment.