Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery: Report RSS #28931

Merged
merged 3 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/collector/corechecks/servicediscovery/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type eventPayload struct {
Ports []uint16 `json:"ports"`
PID int `json:"pid"`
CommandLine []string `json:"command_line"`
RSSMemory uint64 `json:"rss_memory"`
}

type event struct {
Expand Down Expand Up @@ -73,6 +74,7 @@ func (ts *telemetrySender) newEvent(t eventType, svc serviceInfo) *event {
Ports: svc.process.Ports,
PID: svc.process.PID,
CommandLine: svc.process.CmdLine,
RSSMemory: svc.process.Stat.RSS,
},
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/collector/corechecks/servicediscovery/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func Test_telemetrySender(t *testing.T) {
Env: nil,
Stat: procStat{
StartTime: uint64(now.Add(-20 * time.Minute).Unix()),
RSS: 500 * 1024 * 1024,
},
Ports: []uint16{80, 8080},
},
Expand Down Expand Up @@ -95,6 +96,7 @@ func Test_telemetrySender(t *testing.T) {
Ports: []uint16{80, 8080},
PID: 99,
CommandLine: []string{"test-service", "--args"},
RSSMemory: 500 * 1024 * 1024,
},
},
{
Expand All @@ -114,6 +116,7 @@ func Test_telemetrySender(t *testing.T) {
Ports: []uint16{80, 8080},
PID: 99,
CommandLine: []string{"test-service", "--args"},
RSSMemory: 500 * 1024 * 1024,
},
},
{
Expand All @@ -133,6 +136,7 @@ func Test_telemetrySender(t *testing.T) {
Ports: []uint16{80, 8080},
PID: 99,
CommandLine: []string{"test-service", "--args"},
RSSMemory: 500 * 1024 * 1024,
},
},
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/collector/corechecks/servicediscovery/impl_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,22 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) {
}
}

// The endpoint could be refactored in the future to return a map to avoid this.
serviceMap := make(map[int]*model.Service, len(response.Services))
for _, service := range response.Services {
serviceMap[service.PID] = &service
}

events := serviceEvents{}

now := li.time.Now()

// potentialServices contains processes that we scanned in the previous iteration and had open ports.
// we check if they are still alive in this iteration, and if so, we send a start-service telemetry event.
for pid, svc := range li.potentialServices {
if _, ok := procs[pid]; ok {
if service, ok := serviceMap[pid]; ok {
svc.LastHeartbeat = now
svc.process.Stat.RSS = service.RSS
li.aliveServices[pid] = svc
events.start = append(events.start, *svc)
}
Expand Down Expand Up @@ -161,18 +168,19 @@ func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) {

// check if services previously marked as alive still are.
for pid, svc := range li.aliveServices {
if _, ok := procs[pid]; !ok {
if service, ok := serviceMap[pid]; !ok {
delete(li.aliveServices, pid)
events.stop = append(events.stop, *svc)
} else if now.Sub(svc.LastHeartbeat).Truncate(time.Minute) >= heartbeatTime {
svc.LastHeartbeat = now
svc.process.Stat.RSS = service.RSS
events.heartbeat = append(events.heartbeat, *svc)
}
}

// check if services previously marked as ignore are still alive.
for pid := range li.ignoreProcs {
if _, ok := procs[pid]; !ok {
if _, ok := serviceMap[pid]; !ok {
delete(li.ignoreProcs, pid)
}
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/collector/corechecks/servicediscovery/impl_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ var (
Ports: []uint16{8080},
APMInstrumentation: string(apm.None),
NameSource: "provided",
RSS: 100 * 1024 * 1024,
}
portTCP8080UpdatedRSS = model.Service{
PID: procTestService1.pid,
Name: "test-service-1",
Ports: []uint16{8080},
APMInstrumentation: string(apm.None),
NameSource: "provided",
RSS: 200 * 1024 * 1024,
}
portTCP8080DifferentPID = model.Service{
PID: procTestService1DifferentPID.pid,
Expand Down Expand Up @@ -254,7 +263,7 @@ func Test_linuxImpl(t *testing.T) {
servicesResp: &model.ServicesResponse{Services: []model.Service{
portTCP22,
portTCP5000,
portTCP8080,
portTCP8080UpdatedRSS,
portTCP8081,
}},
time: calcTime(20 * time.Minute),
Expand Down Expand Up @@ -288,6 +297,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 100 * 1024 * 1024,
},
},
{
Expand All @@ -306,6 +316,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 200 * 1024 * 1024,
},
},
{
Expand All @@ -324,6 +335,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 200 * 1024 * 1024,
},
},
{
Expand Down Expand Up @@ -455,6 +467,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 100 * 1024 * 1024,
},
},
{
Expand Down Expand Up @@ -505,6 +518,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 100 * 1024 * 1024,
},
},
},
Expand Down Expand Up @@ -580,6 +594,7 @@ func Test_linuxImpl(t *testing.T) {
CommandLine: []string{"test-service-1"},
APMInstrumentation: "none",
ServiceNameSource: "provided",
RSSMemory: 100 * 1024 * 1024,
},
},
{
Expand Down
1 change: 1 addition & 0 deletions pkg/collector/corechecks/servicediscovery/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Service struct {
Ports []uint16 `json:"ports"`
APMInstrumentation string `json:"apm_instrumentation"`
Language string `json:"language"`
RSS uint64 `json:"rss"`
}

// ServicesResponse is the response for the system-probe /discovery/services endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service
return nil
}

rss, err := getRSS(proc)
if err != nil {
return nil
}

var info *serviceInfo
if cached, ok := s.cache[pid]; ok {
info = cached
Expand All @@ -313,6 +318,7 @@ func (s *discovery) getService(context parsingContext, pid int32) *model.Service
Ports: ports,
APMInstrumentation: string(info.apmInstrumentation),
Language: string(info.language),
RSS: rss,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func TestBasic(t *testing.T) {
serviceMap := getServicesMap(t, url)
for _, pid := range expectedPIDs {
require.Contains(t, serviceMap[pid].Ports, uint16(expectedPorts[pid]))
assertRSS(t, serviceMap[pid])
}
}

Expand Down Expand Up @@ -473,11 +474,28 @@ func TestAPMInstrumentationProvided(t *testing.T) {
assert.Contains(collect, portMap, pid)
assert.Equal(collect, string(test.language), portMap[pid].Language)
assert.Equal(collect, string(apm.Provided), portMap[pid].APMInstrumentation)
assertRSS(t, portMap[pid])
}, 30*time.Second, 100*time.Millisecond)
})
}
}

func assertRSS(t assert.TestingT, svc model.Service) {
proc, err := process.NewProcess(int32(svc.PID))
if !assert.NoError(t, err) {
return
}

meminfo, err := proc.MemoryInfo()
if !assert.NoError(t, err) {
return
}

// Allow a 20% variation to avoid potential flakiness due to difference in
// time of sampling the RSS.
assert.InEpsilon(t, meminfo.RSS, svc.RSS, 0.20)
}

func TestNodeDocker(t *testing.T) {
cert, key, err := testutil.GetCertsPaths()
require.NoError(t, err)
Expand All @@ -494,6 +512,7 @@ func TestNodeDocker(t *testing.T) {
assert.Contains(collect, svcMap, pid)
assert.Equal(collect, "nodejs-https-server", svcMap[pid].Name)
assert.Equal(collect, "provided", svcMap[pid].APMInstrumentation)
assertRSS(collect, svcMap[pid])
}, 30*time.Second, 100*time.Millisecond)
}

Expand Down Expand Up @@ -529,6 +548,7 @@ func TestAPMInstrumentationProvidedPython(t *testing.T) {
assert.Contains(collect, portMap, pid)
assert.Equal(collect, string(language.Python), portMap[pid].Language)
assert.Equal(collect, string(apm.Provided), portMap[pid].APMInstrumentation)
assertRSS(collect, portMap[pid])
}, 30*time.Second, 100*time.Millisecond)
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/collector/corechecks/servicediscovery/module/stat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux

package module

import (
"errors"
"os"
"strconv"
"strings"

"github.com/shirou/gopsutil/v3/process"

"github.com/DataDog/datadog-agent/pkg/util/kernel"
)

// pageSize stores the page size of the system in bytes, since the values in
// statm are in pages.
var pageSize = uint64(os.Getpagesize())

// getRSS returns the RSS for the process, in bytes. Compare MemoryInfo() in
// gopsutil which does the same thing but which parses several other fields
// which we're not interested in.
func getRSS(proc *process.Process) (uint64, error) {
statmPath := kernel.HostProc(strconv.Itoa(int(proc.Pid)), "statm")

// This file is very small so just read it fully.
contents, err := os.ReadFile(statmPath)
if err != nil {
return 0, err
}

// See proc(5) for a description of the format of statm and the fields.
fields := strings.Split(string(contents), " ")
if len(fields) < 6 {
return 0, errors.New("invalid statm")
}

rssPages, err := strconv.ParseUint(fields[1], 10, 64)
if err != nil {
return 0, err
}

return rssPages * pageSize, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type serviceInfo struct {

type procStat struct {
StartTime uint64
RSS uint64
}

type processInfo struct {
Expand Down
1 change: 1 addition & 0 deletions test/fakeintake/aggregator/servicediscoveryAggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ServiceDiscoveryPayload struct {
LastSeen int64 `json:"last_seen"`
APMInstrumentation string `json:"apm_instrumentation"`
ServiceNameSource string `json:"service_name_source"`
RSSMemory uint64 `json:"rss_memory"`
} `json:"payload"`
}

Expand Down
7 changes: 6 additions & 1 deletion test/new-e2e/tests/discovery/linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/test-infra-definitions/components/datadog/agentparams"

"github.com/DataDog/datadog-agent/test/fakeintake/aggregator"
"github.com/DataDog/datadog-agent/test/new-e2e/pkg/components"
"github.com/DataDog/datadog-agent/test/new-e2e/pkg/e2e"
"github.com/DataDog/datadog-agent/test/new-e2e/pkg/environments"
awshost "github.com/DataDog/datadog-agent/test/new-e2e/pkg/environments/aws/host"
"github.com/DataDog/test-infra-definitions/components/datadog/agentparams"
)

//go:embed testdata/config/agent_config.yaml
Expand Down Expand Up @@ -92,24 +93,28 @@ func (s *linuxTestSuite) TestServiceDiscoveryCheck() {
if assert.NotNil(c, found) {
assert.Equal(c, "none", found.Payload.APMInstrumentation)
assert.Equal(c, "generated", found.Payload.ServiceNameSource)
assert.NotZero(c, found.Payload.RSSMemory)
}

found = foundMap["node-instrumented"]
if assert.NotNil(c, found) {
assert.Equal(c, "provided", found.Payload.APMInstrumentation)
assert.Equal(c, "generated", found.Payload.ServiceNameSource)
assert.NotZero(c, found.Payload.RSSMemory)
}

found = foundMap["python.server"]
if assert.NotNil(c, found) {
assert.Equal(c, "none", found.Payload.APMInstrumentation)
assert.Equal(c, "generated", found.Payload.ServiceNameSource)
assert.NotZero(c, found.Payload.RSSMemory)
}

found = foundMap["python.instrumented"]
if assert.NotNil(c, found) {
assert.Equal(c, "provided", found.Payload.APMInstrumentation)
assert.Equal(c, "generated", found.Payload.ServiceNameSource)
assert.NotZero(c, found.Payload.RSSMemory)
}

assert.Contains(c, foundMap, "json-server")
Expand Down
Loading