Skip to content

Commit

Permalink
[NETPATH-235][Network Path] Refactor Protocol and fix protocol in met…
Browse files Browse the repository at this point in the history
…ric tag (#27763)
  • Loading branch information
AlexandreYang authored Jul 23, 2024
1 parent e483ed0 commit 69ec7b2
Show file tree
Hide file tree
Showing 15 changed files with 94 additions and 31 deletions.
3 changes: 2 additions & 1 deletion cmd/system-probe/modules/traceroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
sysconfigtypes "github.com/DataDog/datadog-agent/cmd/system-probe/config/types"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
tracerouteutil "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/optional"
Expand Down Expand Up @@ -133,7 +134,7 @@ func parseParams(req *http.Request) (tracerouteutil.Config, error) {
DestPort: uint16(port),
MaxTTL: uint8(maxTTL),
TimeoutMs: uint(timeout),
Protocol: protocol,
Protocol: payload.Protocol(protocol),
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ package common
import (
"encoding/binary"
"hash/fnv"

"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
)

// Pathtest details of information necessary to run a traceroute (pathtrace)
type Pathtest struct {
Hostname string
Port uint16
Protocol string
Protocol payload.Protocol
SourceContainerID string
}

Expand Down
4 changes: 2 additions & 2 deletions comp/networkpath/npcollector/npcollectorimpl/npcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *npCollectorImpl) ScheduleConns(conns []*model.Connection) {
for _, conn := range conns {
remoteAddr := conn.Raddr
remotePort := uint16(conn.Raddr.GetPort())
protocol := conn.GetType().String()
protocol := convertProtocol(conn.GetType())
if !shouldScheduleNetworkPathForConn(conn) {
s.logger.Tracef("Skipped connection: addr=%s, port=%d, protocol=%s", remoteAddr, remotePort, protocol)
continue
Expand All @@ -136,7 +136,7 @@ func (s *npCollectorImpl) ScheduleConns(conns []*model.Connection) {

// scheduleOne schedules pathtests.
// It shouldn't block, if the input channel is full, an error is returned.
func (s *npCollectorImpl) scheduleOne(hostname string, port uint16, protocol string, sourceContainerID string) error {
func (s *npCollectorImpl) scheduleOne(hostname string, port uint16, protocol payload.Protocol, sourceContainerID string) error {
if s.pathtestInputChan == nil {
return errors.New("no input channel, please check that network path is enabled")
}
Expand Down
15 changes: 8 additions & 7 deletions comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func Test_NpCollector_runningAndProcessing(t *testing.T) {
"collector:network_path_collector",
"destination_hostname:abc",
"destination_port:80",
"protocol:udp",
"protocol:UDP",
}
assert.Contains(t, calls, teststatsd.MetricsArgs{Name: "datadog.network_path.path.monitored", Value: 1, Tags: tags, Rate: 1})

Expand Down Expand Up @@ -344,7 +344,7 @@ func Test_npCollectorImpl_ScheduleConns(t *testing.T) {
},
},
expectedPathtests: []*common.Pathtest{
{Hostname: "10.0.0.4", Port: uint16(80), Protocol: "tcp", SourceContainerID: "testId1"},
{Hostname: "10.0.0.4", Port: uint16(80), Protocol: payload.ProtocolTCP, SourceContainerID: "testId1"},
},
},
{
Expand All @@ -359,7 +359,7 @@ func Test_npCollectorImpl_ScheduleConns(t *testing.T) {
},
},
expectedPathtests: []*common.Pathtest{
{Hostname: "10.0.0.6", Port: uint16(161), Protocol: "udp", SourceContainerID: "testId1"},
{Hostname: "10.0.0.6", Port: uint16(161), Protocol: payload.ProtocolUDP, SourceContainerID: "testId1"},
},
},
{
Expand Down Expand Up @@ -399,7 +399,7 @@ func Test_npCollectorImpl_ScheduleConns(t *testing.T) {
},
},
expectedPathtests: []*common.Pathtest{
{Hostname: "10.0.0.4", Port: uint16(80), Protocol: "tcp", SourceContainerID: "testId2"},
{Hostname: "10.0.0.4", Port: uint16(80), Protocol: payload.ProtocolTCP, SourceContainerID: "testId2"},
},
},
{
Expand Down Expand Up @@ -457,7 +457,7 @@ func Test_npCollectorImpl_ScheduleConns(t *testing.T) {
},
},
expectedPathtests: []*common.Pathtest{
{Hostname: "10.0.0.4", Port: uint16(80), Protocol: "tcp", SourceContainerID: "testId3"},
{Hostname: "10.0.0.4", Port: uint16(80), Protocol: payload.ProtocolTCP, SourceContainerID: "testId3"},
},
expectedLogs: []logCount{},
},
Expand Down Expand Up @@ -648,13 +648,14 @@ func Test_npCollectorImpl_sendTelemetry(t *testing.T) {
path := payload.NetworkPath{
Source: payload.NetworkPathSource{Hostname: "abc"},
Destination: payload.NetworkPathDestination{Hostname: "abc", IPAddress: "10.0.0.2", Port: 80},
Protocol: payload.ProtocolUDP,
Hops: []payload.NetworkPathHop{
{Hostname: "hop_1", IPAddress: "1.1.1.1"},
{Hostname: "hop_2", IPAddress: "1.1.1.2"},
},
}
ptestCtx := &pathteststore.PathtestContext{
Pathtest: &common.Pathtest{Hostname: "10.0.0.2", Port: 80},
Pathtest: &common.Pathtest{Hostname: "10.0.0.2", Port: 80, Protocol: payload.ProtocolUDP},
}
ptestCtx.SetLastFlushInterval(2 * time.Minute)
npCollector.TimeNowFn = MockTimeNow
Expand All @@ -669,7 +670,7 @@ func Test_npCollectorImpl_sendTelemetry(t *testing.T) {
"collector:network_path_collector",
"destination_hostname:abc",
"destination_port:80",
"protocol:udp",
"protocol:UDP",
}
assert.Contains(t, calls, teststatsd.MetricsArgs{Name: "datadog.network_path.check_duration", Value: 3, Tags: tags, Rate: 1})
assert.Contains(t, calls, teststatsd.MetricsArgs{Name: "datadog.network_path.check_interval", Value: (2 * time.Minute).Seconds(), Tags: tags, Rate: 1})
Expand Down
10 changes: 10 additions & 0 deletions comp/networkpath/npcollector/npcollectorimpl/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"

model "github.com/DataDog/agent-payload/v5/process"
"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
)

func shouldScheduleNetworkPathForConn(conn *model.Connection) bool {
Expand All @@ -21,3 +22,12 @@ func shouldScheduleNetworkPathForConn(conn *model.Connection) bool {
}
return conn.Family == model.ConnectionFamily_v4
}

func convertProtocol(connType model.ConnectionType) payload.Protocol {
if connType == model.ConnectionType_tcp {
return payload.ProtocolTCP
} else if connType == model.ConnectionType_udp {
return payload.ProtocolUDP
}
return ""
}
6 changes: 6 additions & 0 deletions comp/networkpath/npcollector/npcollectorimpl/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

model "github.com/DataDog/agent-payload/v5/process"
"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -74,3 +75,8 @@ func Test_shouldScheduleNetworkPathForConn(t *testing.T) {
})
}
}

func Test_convertProtocol(t *testing.T) {
assert.Equal(t, convertProtocol(model.ConnectionType_udp), payload.ProtocolUDP)
assert.Equal(t, convertProtocol(model.ConnectionType_tcp), payload.ProtocolTCP)
}
6 changes: 4 additions & 2 deletions pkg/collector/corechecks/networkpath/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package networkpath

import (
"fmt"
"strings"
"time"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
coreconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -49,7 +51,7 @@ type CheckConfig struct {
SourceService string
DestinationService string
MaxTTL uint8
Protocol string
Protocol payload.Protocol
TimeoutMs uint
MinCollectionInterval time.Duration
Tags []string
Expand Down Expand Up @@ -79,7 +81,7 @@ func NewCheckConfig(rawInstance integration.Data, rawInitConfig integration.Data
c.DestinationService = instance.DestinationService
c.MaxTTL = instance.MaxTTL
c.TimeoutMs = instance.TimeoutMs
c.Protocol = instance.Protocol
c.Protocol = payload.Protocol(strings.ToUpper(instance.Protocol))

c.MinCollectionInterval = firstNonZero(
time.Duration(instance.MinCollectionInterval)*time.Second,
Expand Down
43 changes: 43 additions & 0 deletions pkg/collector/corechecks/networkpath/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
coreconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -110,6 +111,48 @@ destination_service: service-b
Namespace: "my-namespace",
},
},
{
name: "lower case protocol",
rawInstance: []byte(`
hostname: 1.2.3.4
protocol: udp
`),
rawInitConfig: []byte(``),
expectedConfig: &CheckConfig{
DestHostname: "1.2.3.4",
MinCollectionInterval: time.Duration(60) * time.Second,
Namespace: "my-namespace",
Protocol: payload.ProtocolUDP,
},
},
{
name: "lower case protocol",
rawInstance: []byte(`
hostname: 1.2.3.4
protocol: UDP
`),
rawInitConfig: []byte(``),
expectedConfig: &CheckConfig{
DestHostname: "1.2.3.4",
MinCollectionInterval: time.Duration(60) * time.Second,
Namespace: "my-namespace",
Protocol: payload.ProtocolUDP,
},
},
{
name: "lower case protocol",
rawInstance: []byte(`
hostname: 1.2.3.4
protocol: TCP
`),
rawInitConfig: []byte(``),
expectedConfig: &CheckConfig{
DestHostname: "1.2.3.4",
MinCollectionInterval: time.Duration(60) * time.Second,
Namespace: "my-namespace",
Protocol: payload.ProtocolTCP,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/networkpath/payload/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package payload
import "github.com/DataDog/datadog-agent/pkg/network"

// Protocol defines supported network protocols
// Please define new protocols based on the Keyword from:
// https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
type Protocol string

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkpath/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func SubmitNetworkPathTelemetry(sender metricsender.MetricSender, path payload.N
}
newTags := append(utils.CopyStrings(tags), []string{
"collector:" + string(pathSource),
"protocol:udp", // TODO: Update to protocol from config when we support tcp/icmp
"protocol:" + string(path.Protocol),
"destination_hostname:" + path.Destination.Hostname,
"destination_port:" + destPortTag,
}...)
Expand Down
5 changes: 4 additions & 1 deletion pkg/networkpath/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestSubmitNetworkPathTelemetry(t *testing.T) {
"destination_hostname:abc",
"destination_port:unspecified",
"foo:bar",
"protocol:udp",
"protocol:UDP",
"tag2:val2",
}
tests := []struct {
Expand All @@ -37,6 +37,7 @@ func TestSubmitNetworkPathTelemetry(t *testing.T) {
name: "with hops and interval",
path: payload.NetworkPath{
Destination: payload.NetworkPathDestination{Hostname: "abc"},
Protocol: payload.ProtocolUDP,
Hops: []payload.NetworkPathHop{
{Hostname: "hop_1", IPAddress: "1.1.1.1"},
{Hostname: "hop_2", IPAddress: "1.1.1.2"},
Expand Down Expand Up @@ -82,6 +83,7 @@ func TestSubmitNetworkPathTelemetry(t *testing.T) {
name: "with last hop successful",
path: payload.NetworkPath{
Destination: payload.NetworkPathDestination{Hostname: "abc"},
Protocol: payload.ProtocolUDP,
Hops: []payload.NetworkPathHop{
{Hostname: "hop_1", IPAddress: "1.1.1.1"},
{Hostname: "hop_2", IPAddress: "1.1.1.2", Success: true},
Expand Down Expand Up @@ -127,6 +129,7 @@ func TestSubmitNetworkPathTelemetry(t *testing.T) {
name: "no hops and no interval",
path: payload.NetworkPath{
Destination: payload.NetworkPathDestination{Hostname: "abc"},
Protocol: payload.ProtocolUDP,
Hops: []payload.NetworkPathHop{},
},
checkDuration: 10 * time.Second,
Expand Down
9 changes: 4 additions & 5 deletions pkg/networkpath/traceroute/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"net"
"os"
"sort"
"strings"
"time"

"github.com/Datadog/dublin-traceroute/go/dublintraceroute/probes/probev4"
Expand Down Expand Up @@ -131,22 +130,22 @@ func (r *Runner) RunTraceroute(ctx context.Context, cfg Config) (payload.Network
}

var pathResult payload.NetworkPath
protocol := strings.ToUpper(cfg.Protocol)
var protocol = cfg.Protocol

// default to UDP if protocol
// is not set
if protocol == "" {
protocol = UDP
protocol = payload.ProtocolUDP
}
switch protocol {
case TCP:
case payload.ProtocolTCP:
log.Debugf("Running TCP traceroute for: %+v", cfg)
pathResult, err = r.runTCP(cfg, hname, dest, maxTTL, timeout)
if err != nil {
tracerouteRunnerTelemetry.failedRuns.Inc()
return payload.NetworkPath{}, err
}
case UDP:
case payload.ProtocolUDP:
log.Debugf("Running UDP traceroute for: %+v", cfg)
pathResult, err = r.runUDP(cfg, hname, dest, maxTTL, timeout)
if err != nil {
Expand Down
9 changes: 1 addition & 8 deletions pkg/networkpath/traceroute/traceroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ import (
"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
)

const (
// UDP represents the UDP protocol
UDP = "UDP"
// TCP represents the TCP protocol
TCP = "TCP"
)

type (
// Config specifies the configuration of an instance
// of Traceroute
Expand All @@ -40,7 +33,7 @@ type (
TimeoutMs uint
// Protocol is the protocol to use
// for traceroute, default is UDP
Protocol string
Protocol payload.Protocol
}

// Traceroute defines an interface for running
Expand Down
4 changes: 2 additions & 2 deletions pkg/networkpath/traceroute/traceroute_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func New(cfg Config, telemetry telemetry.Component) (*MacTraceroute, error) {

// TCP is not supported at the moment due to the
// way go listensn for TCP in our implementation on BSD systems
if cfg.Protocol == TCP {
if cfg.Protocol == payload.ProtocolTCP {
return nil, fmt.Errorf(tcpNotSupportedMsg)
}

Expand All @@ -55,7 +55,7 @@ func (m *MacTraceroute) Run(ctx context.Context) (payload.NetworkPath, error) {

// TCP is not supported at the moment due to the
// way go listens for TCP in our implementation on BSD systems
if m.cfg.Protocol == TCP {
if m.cfg.Protocol == payload.ProtocolTCP {
return payload.NetworkPath{}, fmt.Errorf(tcpNotSupportedMsg)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/process/net/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/languagedetection/languagemodels"
netEncoding "github.com/DataDog/datadog-agent/pkg/network/encoding/unmarshal"
nppayload "github.com/DataDog/datadog-agent/pkg/networkpath/payload"
procEncoding "github.com/DataDog/datadog-agent/pkg/process/encoding"
reqEncoding "github.com/DataDog/datadog-agent/pkg/process/encoding/request"
languagepb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/languagedetection"
Expand Down Expand Up @@ -197,7 +198,7 @@ func (r *RemoteSysProbeUtil) GetPing(clientID string, host string, count int, in
}

// GetTraceroute returns the results of a traceroute to a host
func (r *RemoteSysProbeUtil) GetTraceroute(clientID string, host string, port uint16, protocol string, maxTTL uint8, timeout uint) ([]byte, error) {
func (r *RemoteSysProbeUtil) GetTraceroute(clientID string, host string, port uint16, protocol nppayload.Protocol, maxTTL uint8, timeout uint) ([]byte, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("%s/%s?client_id=%s&port=%d&max_ttl=%d&timeout=%d&protocol=%s", tracerouteURL, host, clientID, port, maxTTL, timeout, protocol), nil)
if err != nil {
return nil, err
Expand Down

0 comments on commit 69ec7b2

Please sign in to comment.