Skip to content

Commit

Permalink
move service discovery to simpler sysprobe client (#31364)
Browse files Browse the repository at this point in the history
  • Loading branch information
brycekahle authored Dec 3, 2024
1 parent 2c77f31 commit 45a9bcf
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 260 deletions.
56 changes: 39 additions & 17 deletions pkg/collector/corechecks/servicediscovery/impl_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
package servicediscovery

import (
"encoding/json"
"fmt"
"net/http"
"time"

sysprobeclient "github.com/DataDog/datadog-agent/cmd/system-probe/api/client"
sysconfig "github.com/DataDog/datadog-agent/cmd/system-probe/config"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/servicediscovery/model"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/servicediscovery/servicetype"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
processnet "github.com/DataDog/datadog-agent/pkg/process/net"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand All @@ -24,38 +28,56 @@ func init() {
}

type linuxImpl struct {
getSysProbeClient processnet.SysProbeUtilGetter
time timer
getDiscoveryServices func(client *http.Client) (*model.ServicesResponse, error)
time timer

ignoreCfg map[string]bool

ignoreProcs map[int]bool
aliveServices map[int]*serviceInfo
potentialServices map[int]*serviceInfo

sysProbeClient *http.Client
}

func newLinuxImpl(ignoreCfg map[string]bool) (osImpl, error) {
return &linuxImpl{
getSysProbeClient: processnet.GetRemoteSystemProbeUtil,
time: realTime{},
ignoreCfg: ignoreCfg,
ignoreProcs: make(map[int]bool),
aliveServices: make(map[int]*serviceInfo),
potentialServices: make(map[int]*serviceInfo),
getDiscoveryServices: getDiscoveryServices,
time: realTime{},
ignoreCfg: ignoreCfg,
ignoreProcs: make(map[int]bool),
aliveServices: make(map[int]*serviceInfo),
potentialServices: make(map[int]*serviceInfo),
sysProbeClient: sysprobeclient.Get(pkgconfigsetup.SystemProbe().GetString("system_probe_config.sysprobe_socket")),
}, nil
}

func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) {
socket := pkgconfigsetup.SystemProbe().GetString("system_probe_config.sysprobe_socket")
sysProbe, err := li.getSysProbeClient(socket)
func getDiscoveryServices(client *http.Client) (*model.ServicesResponse, error) {
url := sysprobeclient.ModuleURL(sysconfig.DiscoveryModule, "/services")
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, errWithCode{
err: err,
code: errorCodeSystemProbeConn,
}
return nil, err
}

resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("got non-success status code: url: %s, status_code: %d", req.URL, resp.StatusCode)
}

response, err := sysProbe.GetDiscoveryServices()
res := &model.ServicesResponse{}
if err := json.NewDecoder(resp.Body).Decode(res); err != nil {
return nil, err
}
return res, nil
}

func (li *linuxImpl) DiscoverServices() (*discoveredServices, error) {
response, err := li.getDiscoveryServices(li.sysProbeClient)
if err != nil {
return nil, errWithCode{
err: err,
Expand Down
12 changes: 3 additions & 9 deletions pkg/collector/corechecks/servicediscovery/impl_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package servicediscovery

import (
"cmp"
"net/http"
"testing"
"time"

Expand All @@ -22,8 +23,6 @@ import (
"github.com/DataDog/datadog-agent/pkg/aggregator/mocksender"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/servicediscovery/apm"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/servicediscovery/model"
"github.com/DataDog/datadog-agent/pkg/process/net"
netmocks "github.com/DataDog/datadog-agent/pkg/process/net/mocks"
)

type testProc struct {
Expand Down Expand Up @@ -601,19 +600,14 @@ func Test_linuxImpl(t *testing.T) {
require.NotNil(t, check.os)

for _, cr := range tc.checkRun {
mSysProbe := netmocks.NewSysProbeUtil(t)
mSysProbe.EXPECT().GetDiscoveryServices().
Return(cr.servicesResp, nil).
Times(1)

_, mHostname := hostnameinterface.NewMock(hostnameinterface.MockHostname(host))

mTimer := NewMocktimer(ctrl)
mTimer.EXPECT().Now().Return(cr.time).AnyTimes()

// set mocks
check.os.(*linuxImpl).getSysProbeClient = func(_ string) (net.SysProbeUtil, error) {
return mSysProbe, nil
check.os.(*linuxImpl).getDiscoveryServices = func(_ *http.Client) (*model.ServicesResponse, error) {
return cr.servicesResp, nil
}
check.os.(*linuxImpl).time = mTimer
check.sender.hostname = mHostname
Expand Down
63 changes: 53 additions & 10 deletions pkg/languagedetection/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@
package languagedetection

import (
"bytes"
"io"
"net/http"
"regexp"
"runtime"
"strings"
"time"

"google.golang.org/protobuf/proto"

sysprobeclient "github.com/DataDog/datadog-agent/cmd/system-probe/api/client"
sysconfig "github.com/DataDog/datadog-agent/cmd/system-probe/config"
"github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/languagedetection/internal/detectors"
"github.com/DataDog/datadog-agent/pkg/languagedetection/languagemodels"
"github.com/DataDog/datadog-agent/pkg/process/net"
languagepb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/languagedetection"
"github.com/DataDog/datadog-agent/pkg/telemetry"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand Down Expand Up @@ -140,15 +147,8 @@ func DetectLanguage(procs []languagemodels.Process, sysprobeConfig model.Reader)
}()

log.Trace("[language detection] Requesting language from system probe")
util, err := net.GetRemoteSystemProbeUtil(
sysprobeConfig.GetString("system_probe_config.sysprobe_socket"),
)
if err != nil {
log.Warn("[language detection] Failed to request language:", err)
return langs
}

privilegedLangs, err := util.DetectLanguage(unknownPids)
sysprobeClient := sysprobeclient.Get(sysprobeConfig.GetString("system_probe_config.sysprobe_socket"))
privilegedLangs, err := detectLanguage(sysprobeClient, unknownPids)
if err != nil {
log.Warn("[language detection] Failed to request language:", err)
return langs
Expand All @@ -161,6 +161,49 @@ func DetectLanguage(procs []languagemodels.Process, sysprobeConfig model.Reader)
return langs
}

func detectLanguage(client *http.Client, pids []int32) ([]languagemodels.Language, error) {
procs := make([]*languagepb.Process, len(pids))
for i, pid := range pids {
procs[i] = &languagepb.Process{Pid: pid}
}
reqBytes, err := proto.Marshal(&languagepb.DetectLanguageRequest{Processes: procs})
if err != nil {
return nil, err
}

url := sysprobeclient.ModuleURL(sysconfig.LanguageDetectionModule, "/detect")
req, err := http.NewRequest(http.MethodGet, url, bytes.NewBuffer(reqBytes))
if err != nil {
return nil, err
}

res, err := client.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()

resBody, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

var resProto languagepb.DetectLanguageResponse
err = proto.Unmarshal(resBody, &resProto)
if err != nil {
return nil, err
}

langs := make([]languagemodels.Language, len(pids))
for i, lang := range resProto.Languages {
langs[i] = languagemodels.Language{
Name: languagemodels.LanguageName(lang.Name),
Version: lang.Version,
}
}
return langs, nil
}

func privilegedLanguageDetectionEnabled(sysProbeConfig model.Reader) bool {
if sysProbeConfig == nil {
return false
Expand Down
1 change: 0 additions & 1 deletion pkg/languagedetection/detector_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

Expand Down
72 changes: 0 additions & 72 deletions pkg/process/net/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,18 @@ package net
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"

model "github.com/DataDog/agent-payload/v5/process"
"google.golang.org/protobuf/proto"

discoverymodel "github.com/DataDog/datadog-agent/pkg/collector/corechecks/servicediscovery/model"
"github.com/DataDog/datadog-agent/pkg/languagedetection/languagemodels"
netEncoding "github.com/DataDog/datadog-agent/pkg/network/encoding/unmarshal"
nppayload "github.com/DataDog/datadog-agent/pkg/networkpath/payload"
procEncoding "github.com/DataDog/datadog-agent/pkg/process/encoding"
reqEncoding "github.com/DataDog/datadog-agent/pkg/process/encoding/request"
languagepb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/languagedetection"
pbgo "github.com/DataDog/datadog-agent/pkg/proto/pbgo/process"
"github.com/DataDog/datadog-agent/pkg/util/funcs"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -287,73 +282,6 @@ func (r *RemoteSysProbeUtil) Register(clientID string) error {
return nil
}

//nolint:revive // TODO(PROC) Fix revive linter
func (r *RemoteSysProbeUtil) DetectLanguage(pids []int32) ([]languagemodels.Language, error) {
procs := make([]*languagepb.Process, len(pids))
for i, pid := range pids {
procs[i] = &languagepb.Process{Pid: pid}
}
reqBytes, err := proto.Marshal(&languagepb.DetectLanguageRequest{Processes: procs})
if err != nil {
return nil, err
}

req, err := http.NewRequest(http.MethodGet, languageDetectionURL, bytes.NewBuffer(reqBytes))
if err != nil {
return nil, err
}

res, err := r.httpClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()

resBody, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

var resProto languagepb.DetectLanguageResponse
err = proto.Unmarshal(resBody, &resProto)
if err != nil {
return nil, err
}

langs := make([]languagemodels.Language, len(pids))
for i, lang := range resProto.Languages {
langs[i] = languagemodels.Language{
Name: languagemodels.LanguageName(lang.Name),
Version: lang.Version,
}
}
return langs, nil
}

// GetDiscoveryServices returns service information from system-probe.
func (r *RemoteSysProbeUtil) GetDiscoveryServices() (*discoverymodel.ServicesResponse, error) {
req, err := http.NewRequest(http.MethodGet, discoveryServicesURL, nil)
if err != nil {
return nil, err
}

resp, err := r.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("got non-success status code: path %s, url: %s, status_code: %d", r.path, discoveryServicesURL, resp.StatusCode)
}

res := &discoverymodel.ServicesResponse{}
if err := json.NewDecoder(resp.Body).Decode(res); err != nil {
return nil, err
}
return res, nil
}

func (r *RemoteSysProbeUtil) init() error {
resp, err := r.httpClient.Get(statsURL)
if err != nil {
Expand Down
16 changes: 7 additions & 9 deletions pkg/process/net/common_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ import (
)

const (
pingURL = "http://unix/" + string(sysconfig.PingModule) + "/ping/"
tracerouteURL = "http://unix/" + string(sysconfig.TracerouteModule) + "/traceroute/"
connectionsURL = "http://unix/" + string(sysconfig.NetworkTracerModule) + "/connections"
networkIDURL = "http://unix/" + string(sysconfig.NetworkTracerModule) + "/network_id"
procStatsURL = "http://unix/" + string(sysconfig.ProcessModule) + "/stats"
registerURL = "http://unix/" + string(sysconfig.NetworkTracerModule) + "/register"
statsURL = "http://unix/debug/stats"
languageDetectionURL = "http://unix/" + string(sysconfig.LanguageDetectionModule) + "/detect"
discoveryServicesURL = "http://unix/" + string(sysconfig.DiscoveryModule) + "/services"
pingURL = "http://unix/" + string(sysconfig.PingModule) + "/ping/"
tracerouteURL = "http://unix/" + string(sysconfig.TracerouteModule) + "/traceroute/"
connectionsURL = "http://unix/" + string(sysconfig.NetworkTracerModule) + "/connections"
networkIDURL = "http://unix/" + string(sysconfig.NetworkTracerModule) + "/network_id"
procStatsURL = "http://unix/" + string(sysconfig.ProcessModule) + "/stats"
registerURL = "http://unix/" + string(sysconfig.NetworkTracerModule) + "/register"
statsURL = "http://unix/debug/stats"
)

// CheckPath is used in conjunction with calling the stats endpoint, since we are calling this
Expand Down
12 changes: 0 additions & 12 deletions pkg/process/net/common_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

model "github.com/DataDog/agent-payload/v5/process"

discoverymodel "github.com/DataDog/datadog-agent/pkg/collector/corechecks/servicediscovery/model"
"github.com/DataDog/datadog-agent/pkg/languagedetection/languagemodels"
nppayload "github.com/DataDog/datadog-agent/pkg/networkpath/payload"
)

Expand Down Expand Up @@ -63,16 +61,6 @@ func (r *RemoteSysProbeUtil) Register(_ string) error {
return ErrNotImplemented
}

// DetectLanguage is not supported
func (r *RemoteSysProbeUtil) DetectLanguage([]int32) ([]languagemodels.Language, error) {
return nil, ErrNotImplemented
}

// GetDiscoveryServices is not supported
func (r *RemoteSysProbeUtil) GetDiscoveryServices() (*discoverymodel.ServicesResponse, error) {
return nil, ErrNotImplemented
}

// GetPing is not supported
func (r *RemoteSysProbeUtil) GetPing(_ string, _ string, _ int, _ time.Duration, _ time.Duration) ([]byte, error) {
return nil, ErrNotImplemented
Expand Down
13 changes: 5 additions & 8 deletions pkg/process/net/common_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@ import (
)

const (
connectionsURL = "http://localhost:3333/" + string(sysconfig.NetworkTracerModule) + "/connections"
networkIDURL = "http://unix/" + string(sysconfig.NetworkTracerModule) + "/network_id"
registerURL = "http://localhost:3333/" + string(sysconfig.NetworkTracerModule) + "/register"
languageDetectionURL = "http://localhost:3333/" + string(sysconfig.LanguageDetectionModule) + "/detect"
statsURL = "http://localhost:3333/debug/stats"
tracerouteURL = "http://localhost:3333/" + string(sysconfig.TracerouteModule) + "/traceroute/"
connectionsURL = "http://localhost:3333/" + string(sysconfig.NetworkTracerModule) + "/connections"
networkIDURL = "http://unix/" + string(sysconfig.NetworkTracerModule) + "/network_id"
registerURL = "http://localhost:3333/" + string(sysconfig.NetworkTracerModule) + "/register"
statsURL = "http://localhost:3333/debug/stats"
tracerouteURL = "http://localhost:3333/" + string(sysconfig.TracerouteModule) + "/traceroute/"

// discovery* is not used on Windows, the value is added to avoid a compilation error
discoveryServicesURL = "http://localhost:3333/" + string(sysconfig.DiscoveryModule) + "/services"
// procStatsURL is not used in windows, the value is added to avoid compilation error in windows
procStatsURL = "http://localhost:3333/" + string(sysconfig.ProcessModule) + "stats"
// pingURL is not used in windows, the value is added to avoid compilation error in windows
Expand Down
Loading

0 comments on commit 45a9bcf

Please sign in to comment.