From 28291f0ff009dad19269a95d4ad262bfa6ce107c Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Wed, 9 Oct 2024 15:34:50 -0400 Subject: [PATCH 01/32] Allow rdsquerier to run in snmp core check --- cmd/agent/subcommands/run/command.go | 6 ++++ pkg/cli/subcommands/check/command.go | 4 +++ pkg/collector/check/context.go | 26 ++++++++++++++- .../internal/report/report_device_metadata.go | 32 +++++++++++++++++++ pkg/networkdevice/metadata/payload.go | 1 + 5 files changed, 68 insertions(+), 1 deletion(-) diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index 45da16dba5426..51265000ccf2b 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -155,6 +155,8 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/optional" "github.com/DataDog/datadog-agent/pkg/version" + rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" + // runtime init routines ddruntime "github.com/DataDog/datadog-agent/pkg/runtime" ) @@ -255,6 +257,7 @@ func run(log log.Component, settings settings.Component, _ optional.Option[gui.Component], _ agenttelemetry.Component, + rdnsquerier rdnsquerier.Component, ) error { defer func() { stopAgent() @@ -319,6 +322,7 @@ func run(log log.Component, cloudfoundrycontainer, jmxlogger, settings, + rdnsquerier, ); err != nil { return err } @@ -494,6 +498,7 @@ func startAgent( _ cloudfoundrycontainer.Component, jmxLogger jmxlogger.Component, settings settings.Component, + rdnsquerier rdnsquerier.Component, ) error { var err error @@ -571,6 +576,7 @@ func startAgent( // TODO: (components) - Until the checks are components we set there context so they can depends on components. check.InitializeInventoryChecksContext(invChecks) + check.InitializeRDNSQuerierContext(rdnsquerier) // Init JMX runner and inject dogstatsd component jmxfetch.InitRunner(server, jmxLogger) diff --git a/pkg/cli/subcommands/check/command.go b/pkg/cli/subcommands/check/command.go index 9e33f24d63548..ab0763af0db74 100644 --- a/pkg/cli/subcommands/check/command.go +++ b/pkg/cli/subcommands/check/command.go @@ -65,6 +65,7 @@ import ( integrations "github.com/DataDog/datadog-agent/comp/logs/integrations/def" "github.com/DataDog/datadog-agent/comp/metadata/inventorychecks" "github.com/DataDog/datadog-agent/comp/metadata/inventorychecks/inventorychecksimpl" + rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" "github.com/DataDog/datadog-agent/comp/remote-config/rcservice" "github.com/DataDog/datadog-agent/comp/remote-config/rcservicemrf" "github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl" @@ -264,6 +265,7 @@ func run( jmxLogger jmxlogger.Component, telemetry telemetry.Component, logReceiver optional.Option[integrations.Component], + rdnsquerier rdnsquerier.Component, ) error { previousIntegrationTracing := false previousIntegrationTracingExhaustive := false @@ -288,6 +290,8 @@ func run( // TODO: (components) - Until the checks are components we set there context so they can depends on components. check.InitializeInventoryChecksContext(invChecks) + check.InitializeRDNSQuerierContext(rdnsquerier) + pkgcollector.InitPython(common.GetPythonPaths()...) commonchecks.RegisterChecks(wmeta, config, telemetry) diff --git a/pkg/collector/check/context.go b/pkg/collector/check/context.go index a1e39e4145376..f8226b6cd8ca5 100644 --- a/pkg/collector/check/context.go +++ b/pkg/collector/check/context.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/DataDog/datadog-agent/comp/metadata/inventorychecks" + rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" ) // checkContext holds a list of reference to different components used by Go and Python checks. @@ -20,7 +21,8 @@ import ( // of C to Go. This way python checks can submit metadata to inventorychecks through the 'SetCheckMetadata' python // method. type checkContext struct { - ic inventorychecks.Component + ic inventorychecks.Component + rdnsquerier rdnsquerier.Component } var ctx checkContext @@ -47,10 +49,32 @@ func InitializeInventoryChecksContext(ic inventorychecks.Component) { } } +// GetRDNSQuerierContext returns a reference to the rdnsquerier component for Python and Go checks to use. +func GetRDNSQuerierContext() (rdnsquerier.Component, error) { + checkContextMutex.Lock() + defer checkContextMutex.Unlock() + + if ctx.rdnsquerier == nil { + return nil, errors.New("rdnsquerier context was not set") + } + return ctx.rdnsquerier, nil +} + +// InitializeRDNSQuerierContext set the reference to rdnsquerier in checkContext +func InitializeRDNSQuerierContext(rdnsquerier rdnsquerier.Component) { + checkContextMutex.Lock() + defer checkContextMutex.Unlock() + + if ctx.rdnsquerier == nil { + ctx.rdnsquerier = rdnsquerier + } +} + // ReleaseContext reset to nil all the references hold by the current context func ReleaseContext() { checkContextMutex.Lock() defer checkContextMutex.Unlock() ctx.ic = nil + ctx.rdnsquerier = nil } diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go index ff8d90912efb3..fb78d17536666 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go @@ -11,6 +11,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" @@ -21,6 +22,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/networkdevice/profile/profiledefinition" "github.com/DataDog/datadog-agent/pkg/networkdevice/utils" + "github.com/DataDog/datadog-agent/pkg/collector/check" "github.com/DataDog/datadog-agent/pkg/collector/corechecks/snmp/internal/checkconfig" "github.com/DataDog/datadog-agent/pkg/collector/corechecks/snmp/internal/common" "github.com/DataDog/datadog-agent/pkg/collector/corechecks/snmp/internal/lldp" @@ -214,6 +216,8 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc vendor = config.ProfileDef.Device.Vendor } + hostname := lookupHostnameWithRDNS(config.IPAddress) + return devicemetadata.DeviceMetadata{ ID: deviceID, IDTags: idTags, @@ -238,9 +242,37 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc OsHostname: osHostname, DeviceType: deviceType, Integration: common.SnmpIntegrationName, + RDNSHostname: hostname, } } +func lookupHostnameWithRDNS(ip string) string { + if rdnsquerier, err := check.GetRDNSQuerierContext(); err == nil { + var hostname string + var wg sync.WaitGroup + + wg.Add(1) + err := rdnsquerier.GetHostname( + net.ParseIP(ip).To4(), + func(h string) { + hostname = h + wg.Done() + }, + func(h string, err error) { + hostname = h + wg.Done() + }, + ) + if err != nil { + log.Tracef("Error resolving reverse DNS enrichment for source IP address: %v error: %v", ip, err) + wg.Done() + } + wg.Wait() + return hostname + } + return "" +} + func getProfileVersion(config *checkconfig.CheckConfig) uint64 { var profileVersion uint64 if config.ProfileDef != nil { diff --git a/pkg/networkdevice/metadata/payload.go b/pkg/networkdevice/metadata/payload.go index 8e5efc545770e..e20cf17ede437 100644 --- a/pkg/networkdevice/metadata/payload.go +++ b/pkg/networkdevice/metadata/payload.go @@ -73,6 +73,7 @@ type DeviceMetadata struct { OsHostname string `json:"os_hostname,omitempty"` Integration string `json:"integration,omitempty"` // indicates the source of the data SNMP, meraki_api, etc. DeviceType string `json:"device_type,omitempty"` + RDNSHostname string `json:"rdns_hostname,omitempty"` } // DeviceOID device scan oid data From 6fdcbf4fc4dca78133938f8a3a8605025cf5d3ff Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Thu, 10 Oct 2024 16:25:38 -0400 Subject: [PATCH 02/32] Move Sync rDNS lookup to querier --- comp/rdnsquerier/def/component.go | 1 + comp/rdnsquerier/impl/rdnsquerier.go | 48 +++++++++++++++++++ comp/rdnsquerier/impl/rdnsquerier_test.go | 43 +++++++++++++++++ .../internal/report/report_device_metadata.go | 33 ++----------- 4 files changed, 96 insertions(+), 29 deletions(-) diff --git a/comp/rdnsquerier/def/component.go b/comp/rdnsquerier/def/component.go index 88a64bcfa718f..d756204b9b320 100644 --- a/comp/rdnsquerier/def/component.go +++ b/comp/rdnsquerier/def/component.go @@ -11,4 +11,5 @@ package rdnsquerier // Component is the component type. type Component interface { GetHostname([]byte, func(string), func(string, error)) error + GetHostnameSync(string) (string, error) } diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index c943d795cc23c..e3a688ca3a0ec 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -9,7 +9,9 @@ package rdnsquerierimpl import ( "context" "fmt" + "net" "net/netip" + "sync" "github.com/DataDog/datadog-agent/comp/core/config" log "github.com/DataDog/datadog-agent/comp/core/log/def" @@ -181,6 +183,52 @@ func (q *rdnsQuerierImpl) GetHostname(ipAddr []byte, updateHostnameSync func(str return nil } +// GetHostnameSync attempts to resolve the hostname for the given IP address synchronously. +// If the IP address is invalid then an error is returned. +// If the IP address is not in the private address space then it is ignored - no lookup is performed and nil error is returned. +// If the IP address is in the private address space then the IP address will be resolved to a hostname. +func (q *rdnsQuerierImpl) GetHostnameSync(ipAddr string) (string, error) { + q.internalTelemetry.total.Inc() + + // netipAddr, ok := netip.AddrFromSlice(ipAddr) + netipAddr := net.ParseIP(ipAddr).To4() + if netipAddr == nil { + q.internalTelemetry.invalidIPAddress.Inc() + return "", fmt.Errorf("invalid IP address %v", ipAddr) + } + + if !netipAddr.IsPrivate() { + q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", netipAddr) + return "", nil + } + q.internalTelemetry.private.Inc() + + var hostname string + var err error + var wg sync.WaitGroup + + wg.Add(1) + err = q.GetHostname( + netipAddr, + func(h string) { + hostname = h + wg.Done() + }, + func(h string, e error) { + hostname = h + err = e + wg.Done() + }, + ) + if err != nil { + q.logger.Tracef("Error resolving reverse DNS enrichment for source IP address: %v error: %v", ipAddr, err) + wg.Done() + } + wg.Wait() + + return hostname, err +} + func (q *rdnsQuerierImpl) start(_ context.Context) error { if q.started { q.logger.Debugf("Reverse DNS Enrichment already started") diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index fb5102e511815..b83c151786b05 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -986,3 +986,46 @@ func TestCachePersist(t *testing.T) { }) ts.validateExpected(t, expectedTelemetry) } + +func TestGetHostnameSync(t *testing.T) { + overrides := map[string]interface{}{ + "network_devices.netflow.reverse_dns_enrichment_enabled": true, + } + ts := testSetup(t, overrides, true, nil) + internalRDNSQuerier := ts.rdnsQuerier.(*rdnsQuerierImpl) + + // Test with invalid IP address + hostname, err := internalRDNSQuerier.GetHostnameSync("invalid_ip") + assert.Error(t, err) + assert.Equal(t, "", hostname) + + // Test with IP address not in private range + hostname, err = internalRDNSQuerier.GetHostnameSync("8.8.8.8") + assert.NoError(t, err) + assert.Equal(t, "", hostname) + + // Test with IP address in private range + hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.100") + assert.NoError(t, err) + assert.NotEqual(t, "", hostname) + + // Test with IP address in private range but cache miss + hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.101") + assert.NoError(t, err) + assert.NotEqual(t, "", hostname) + + // // Test with a valid IP address that resolves to a hostname + hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.100") // cached from earlier test + assert.NoError(t, err) + assert.Equal(t, "fakehostname-192.168.1.100", hostname) + + // Test with an empty string as input + hostname, err = internalRDNSQuerier.GetHostnameSync("") + assert.Error(t, err) + assert.Equal(t, "", hostname) + + // Test with an IPv6 address + hostname, err = internalRDNSQuerier.GetHostnameSync("2001:4860:4860::8888") // Google Public DNS + assert.Error(t, err) + assert.Equal(t, "", hostname) +} diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go index fb78d17536666..7685a59ea4e5c 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go @@ -11,7 +11,6 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" @@ -216,7 +215,10 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc vendor = config.ProfileDef.Device.Vendor } - hostname := lookupHostnameWithRDNS(config.IPAddress) + hostname := "" + if rdnsquerier, err := check.GetRDNSQuerierContext(); err == nil { + hostname, _ = rdnsquerier.GetHostnameSync(config.IPAddress) + } return devicemetadata.DeviceMetadata{ ID: deviceID, @@ -246,33 +248,6 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc } } -func lookupHostnameWithRDNS(ip string) string { - if rdnsquerier, err := check.GetRDNSQuerierContext(); err == nil { - var hostname string - var wg sync.WaitGroup - - wg.Add(1) - err := rdnsquerier.GetHostname( - net.ParseIP(ip).To4(), - func(h string) { - hostname = h - wg.Done() - }, - func(h string, err error) { - hostname = h - wg.Done() - }, - ) - if err != nil { - log.Tracef("Error resolving reverse DNS enrichment for source IP address: %v error: %v", ip, err) - wg.Done() - } - wg.Wait() - return hostname - } - return "" -} - func getProfileVersion(config *checkconfig.CheckConfig) uint64 { var profileVersion uint64 if config.ProfileDef != nil { From 14d6fd0c4d434bdb18b46bdeee1e74b9e7f66665 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Thu, 10 Oct 2024 16:42:41 -0400 Subject: [PATCH 03/32] add release notes --- ...ndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml diff --git a/releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml b/releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml new file mode 100644 index 0000000000000..d63e7389e3a58 --- /dev/null +++ b/releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml @@ -0,0 +1,11 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +features: + - | + Added support for enrighing SNMP IPs with hostnames via rDNS lookups From e33a05ea1026aa697f7f22e945b0135f3fa349d7 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Thu, 10 Oct 2024 17:05:55 -0400 Subject: [PATCH 04/32] Add component for rdnsQuerierImplNone --- comp/rdnsquerier/impl-none/none.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/comp/rdnsquerier/impl-none/none.go b/comp/rdnsquerier/impl-none/none.go index 8f3d7cb3100b0..ca10bf84e0135 100644 --- a/comp/rdnsquerier/impl-none/none.go +++ b/comp/rdnsquerier/impl-none/none.go @@ -29,3 +29,8 @@ func (q *rdnsQuerierImplNone) GetHostname(_ []byte, _ func(string), _ func(strin // noop return nil } + +func (q *rdnsQuerierImplNone) GetHostnameSync(_ string) (string, error) { + // noop + return "", nil +} From b186c387d0b52e72d1b74e85b87b2d3bb4a4b5c4 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Wed, 16 Oct 2024 16:30:27 -0400 Subject: [PATCH 05/32] Add component mock --- comp/rdnsquerier/mock/mock.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/comp/rdnsquerier/mock/mock.go b/comp/rdnsquerier/mock/mock.go index 9a1d678f60ae0..6bebec65200dc 100644 --- a/comp/rdnsquerier/mock/mock.go +++ b/comp/rdnsquerier/mock/mock.go @@ -10,6 +10,7 @@ package mock import ( "fmt" + "net" "net/netip" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" @@ -51,3 +52,18 @@ func (q *rdnsQuerierMock) GetHostname(ipAddr []byte, updateHostnameSync func(str return nil } + +// GetHostnameSync simulates resolving the hostname for the given IP address synchronously. If the IP address is in the private address +// space then the resolved hostname is returned. +func (q *rdnsQuerierMock) GetHostnameSync(ipAddr string) (string, error) { + ipaddr := net.ParseIP(ipAddr).To4() + if ipaddr == nil { + return "", fmt.Errorf("invalid IP address %v", ipAddr) + } + + if !ipaddr.IsPrivate() { + return "", nil + } + + return "hostname-" + ipaddr.String(), nil +} From 96d868ff55ebe631f4074480659fd009de257451 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Wed, 16 Oct 2024 18:00:49 -0400 Subject: [PATCH 06/32] debug: add log line --- .../corechecks/snmp/internal/report/report_device_metadata.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go index 7685a59ea4e5c..e6fe2f394ce15 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go @@ -219,6 +219,7 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc if rdnsquerier, err := check.GetRDNSQuerierContext(); err == nil { hostname, _ = rdnsquerier.GetHostnameSync(config.IPAddress) } + log.Infof("FOUND HOSTNAME: %s", hostname) return devicemetadata.DeviceMetadata{ ID: deviceID, From 44a9d29a90c36fbde44408523eb05a44940cce15 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Thu, 17 Oct 2024 19:52:41 -0400 Subject: [PATCH 07/32] Fix: SNMP CLI command, missing rdsquerier fx import --- pkg/cli/subcommands/check/command.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/cli/subcommands/check/command.go b/pkg/cli/subcommands/check/command.go index ab0763af0db74..fb026972ad5ca 100644 --- a/pkg/cli/subcommands/check/command.go +++ b/pkg/cli/subcommands/check/command.go @@ -66,6 +66,7 @@ import ( "github.com/DataDog/datadog-agent/comp/metadata/inventorychecks" "github.com/DataDog/datadog-agent/comp/metadata/inventorychecks/inventorychecksimpl" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" + rdnsquerierfx "github.com/DataDog/datadog-agent/comp/rdnsquerier/fx" "github.com/DataDog/datadog-agent/comp/remote-config/rcservice" "github.com/DataDog/datadog-agent/comp/remote-config/rcservicemrf" "github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl" @@ -178,6 +179,7 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command { autodiscoveryimpl.Module(), forwarder.Bundle(defaultforwarder.NewParams(defaultforwarder.WithNoopForwarder())), inventorychecksimpl.Module(), + rdnsquerierfx.Module(), // inventorychecksimpl depends on a collector and serializer when created to send payload. // Here we just want to collect metadata to be displayed, so we don't need a collector. collector.NoneModule(), From 13be360561d531db6e44ccf46bd090f2873b7dcb Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Thu, 17 Oct 2024 20:56:15 -0400 Subject: [PATCH 08/32] Add timeout the sync rDNS resolver --- cmd/agent/subcommands/run/command.go | 3 +- comp/rdnsquerier/def/component.go | 4 +- comp/rdnsquerier/impl-none/none.go | 4 +- comp/rdnsquerier/impl/rdnsquerier.go | 67 ++++++++++++------- comp/rdnsquerier/impl/rdnsquerier_test.go | 56 +++++++++++----- .../rdnsquerier/impl/rdnsquerier_testutils.go | 12 +++- comp/rdnsquerier/mock/mock.go | 3 +- 7 files changed, 103 insertions(+), 46 deletions(-) diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index 51265000ccf2b..552c8e18b2c12 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -117,6 +117,7 @@ import ( "github.com/DataDog/datadog-agent/comp/otelcol/logsagentpipeline" processAgent "github.com/DataDog/datadog-agent/comp/process/agent" processagentStatusImpl "github.com/DataDog/datadog-agent/comp/process/status/statusimpl" + rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" rdnsquerierfx "github.com/DataDog/datadog-agent/comp/rdnsquerier/fx" remoteconfig "github.com/DataDog/datadog-agent/comp/remote-config" "github.com/DataDog/datadog-agent/comp/remote-config/rcclient" @@ -155,8 +156,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/optional" "github.com/DataDog/datadog-agent/pkg/version" - rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" - // runtime init routines ddruntime "github.com/DataDog/datadog-agent/pkg/runtime" ) diff --git a/comp/rdnsquerier/def/component.go b/comp/rdnsquerier/def/component.go index d756204b9b320..5b020be1498b2 100644 --- a/comp/rdnsquerier/def/component.go +++ b/comp/rdnsquerier/def/component.go @@ -6,10 +6,12 @@ // Package rdnsquerier provides the reverse DNS querier component. package rdnsquerier +import "time" + // team: network-device-monitoring // Component is the component type. type Component interface { GetHostname([]byte, func(string), func(string, error)) error - GetHostnameSync(string) (string, error) + GetHostnameSync(string, ...time.Duration) (string, error) } diff --git a/comp/rdnsquerier/impl-none/none.go b/comp/rdnsquerier/impl-none/none.go index ca10bf84e0135..5b4899191ac83 100644 --- a/comp/rdnsquerier/impl-none/none.go +++ b/comp/rdnsquerier/impl-none/none.go @@ -7,6 +7,8 @@ package rdnsquerierimpl import ( + "time" + rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" ) @@ -30,7 +32,7 @@ func (q *rdnsQuerierImplNone) GetHostname(_ []byte, _ func(string), _ func(strin return nil } -func (q *rdnsQuerierImplNone) GetHostnameSync(_ string) (string, error) { +func (q *rdnsQuerierImplNone) GetHostnameSync(_ string, _ ...time.Duration) (string, error) { // noop return "", nil } diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index e3a688ca3a0ec..f169678fefe38 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -12,6 +12,7 @@ import ( "net" "net/netip" "sync" + "time" "github.com/DataDog/datadog-agent/comp/core/config" log "github.com/DataDog/datadog-agent/comp/core/log/def" @@ -187,10 +188,21 @@ func (q *rdnsQuerierImpl) GetHostname(ipAddr []byte, updateHostnameSync func(str // If the IP address is invalid then an error is returned. // If the IP address is not in the private address space then it is ignored - no lookup is performed and nil error is returned. // If the IP address is in the private address space then the IP address will be resolved to a hostname. -func (q *rdnsQuerierImpl) GetHostnameSync(ipAddr string) (string, error) { +// The function accepts a timeout duration, which defaults to 2 seconds if not provided. +func (q *rdnsQuerierImpl) GetHostnameSync(ipAddr string, timeout ...time.Duration) (string, error) { q.internalTelemetry.total.Inc() - // netipAddr, ok := netip.AddrFromSlice(ipAddr) + // Set default timeout to 2 seconds if not provided + var timeoutDuration time.Duration + if len(timeout) > 0 { + timeoutDuration = timeout[0] + } else { + timeoutDuration = 2 * time.Second + } + + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + defer cancel() + netipAddr := net.ParseIP(ipAddr).To4() if netipAddr == nil { q.internalTelemetry.invalidIPAddress.Inc() @@ -199,34 +211,43 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ipAddr string) (string, error) { if !netipAddr.IsPrivate() { q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", netipAddr) - return "", nil + return "", fmt.Errorf("IP address %v is not in the private address space", netipAddr) } q.internalTelemetry.private.Inc() var hostname string var err error - var wg sync.WaitGroup - - wg.Add(1) - err = q.GetHostname( - netipAddr, - func(h string) { - hostname = h + done := make(chan struct{}) + + go func() { + defer close(done) + var wg sync.WaitGroup + wg.Add(1) + err = q.GetHostname( + netipAddr, + func(h string) { + hostname = h + wg.Done() + }, + func(h string, e error) { + hostname = h + err = e + wg.Done() + }, + ) + if err != nil { + q.logger.Tracef("Error resolving reverse DNS enrichment for source IP address: %v error: %v", ipAddr, err) wg.Done() - }, - func(h string, e error) { - hostname = h - err = e - wg.Done() - }, - ) - if err != nil { - q.logger.Tracef("Error resolving reverse DNS enrichment for source IP address: %v error: %v", ipAddr, err) - wg.Done() + } + wg.Wait() + }() + + select { + case <-done: + return hostname, err + case <-ctx.Done(): + return "", fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr) } - wg.Wait() - - return hostname, err } func (q *rdnsQuerierImpl) start(_ context.Context) error { diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index b83c151786b05..b33fe2a2c50d1 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -20,7 +20,7 @@ func TestStartStop(t *testing.T) { overrides := map[string]interface{}{ "network_devices.netflow.reverse_dns_enrichment_enabled": true, } - ts := testSetup(t, overrides, false, nil) + ts := testSetup(t, overrides, false, nil, 0) internalRDNSQuerier := ts.rdnsQuerier.(*rdnsQuerierImpl) assert.NotNil(t, internalRDNSQuerier) @@ -38,7 +38,7 @@ func TestNotStarted(t *testing.T) { overrides := map[string]interface{}{ "network_devices.netflow.reverse_dns_enrichment_enabled": true, } - ts := testSetup(t, overrides, false, nil) + ts := testSetup(t, overrides, false, nil, 0) // IP address in private range err := ts.rdnsQuerier.GetHostname( @@ -66,7 +66,7 @@ func TestNormalOperationsDefaultConfig(t *testing.T) { overrides := map[string]interface{}{ "network_devices.netflow.reverse_dns_enrichment_enabled": true, } - ts := testSetup(t, overrides, true, nil) + ts := testSetup(t, overrides, true, nil, 0) var wg sync.WaitGroup @@ -143,7 +143,7 @@ func TestNormalOperationsCacheDisabled(t *testing.T) { "network_devices.netflow.reverse_dns_enrichment_enabled": true, "reverse_dns_enrichment.cache.enabled": false, } - ts := testSetup(t, overrides, true, nil) + ts := testSetup(t, overrides, true, nil, 0) var wg sync.WaitGroup @@ -221,7 +221,7 @@ func TestRateLimiter(t *testing.T) { "network_devices.netflow.reverse_dns_enrichment_enabled": true, "reverse_dns_enrichment.rate_limiter.limit_per_sec": 1, } - ts := testSetup(t, overrides, true, nil) + ts := testSetup(t, overrides, true, nil, 0) // IP addresses in private range for i := range 20 { @@ -279,6 +279,7 @@ func TestRateLimiterThrottled(t *testing.T) { &net.DNSError{Err: "test timeout error", IsTimeout: true}, }}, }, + 0, ) var wg sync.WaitGroup @@ -457,7 +458,7 @@ func TestChannelFullRequestsDroppedWhenRateLimited(t *testing.T) { "reverse_dns_enrichment.rate_limiter.enabled": true, "reverse_dns_enrichment.rate_limiter.limit_per_sec": 1, } - ts := testSetup(t, overrides, true, nil) + ts := testSetup(t, overrides, true, nil, 0) var wg sync.WaitGroup @@ -509,7 +510,7 @@ func TestCacheHitInProgress(t *testing.T) { "network_devices.netflow.reverse_dns_enrichment_enabled": true, "reverse_dns_enrichment.rate_limiter.limit_per_sec": 1, } - ts := testSetup(t, overrides, true, nil) + ts := testSetup(t, overrides, true, nil, 0) var wg sync.WaitGroup @@ -581,6 +582,7 @@ func TestRetries(t *testing.T) { fmt.Errorf("test error")}, }, }, + 0, ) var wg sync.WaitGroup @@ -668,6 +670,7 @@ func TestRetriesExceeded(t *testing.T) { fmt.Errorf("test error3")}, }, }, + 0, ) var wg sync.WaitGroup @@ -725,6 +728,7 @@ func TestIsNotFound(t *testing.T) { &net.DNSError{Err: "no such host", IsNotFound: true}}, }, }, + 0, ) var wg sync.WaitGroup @@ -775,7 +779,7 @@ func TestCacheMaxSize(t *testing.T) { "network_devices.netflow.reverse_dns_enrichment_enabled": true, "reverse_dns_enrichment.cache.max_size": 5, } - ts := testSetup(t, overrides, true, nil) + ts := testSetup(t, overrides, true, nil, 0) var wg sync.WaitGroup @@ -823,7 +827,7 @@ func TestCacheExpiration(t *testing.T) { "reverse_dns_enrichment.cache.entry_ttl": time.Duration(100) * time.Millisecond, "reverse_dns_enrichment.cache.clean_interval": time.Duration(1) * time.Second, } - ts := testSetup(t, overrides, true, nil) + ts := testSetup(t, overrides, true, nil, 0) var wg sync.WaitGroup @@ -874,7 +878,7 @@ func TestCachePersist(t *testing.T) { "run_path": t.TempDir(), } - ts := testSetup(t, overrides, true, nil) + ts := testSetup(t, overrides, true, nil, 0) var wg sync.WaitGroup @@ -923,7 +927,7 @@ func TestCachePersist(t *testing.T) { assert.NoError(t, ts.lc.Stop(ts.ctx)) // create new testsetup, validate that the IP address previously queried and cached is still cached - ts = testSetup(t, overrides, true, nil) + ts = testSetup(t, overrides, true, nil, 0) ts.validateExpectedGauge(t, "cache_size", 1.0) // cache hit should result in sync callback being called the first time the IP address is queried after @@ -955,7 +959,7 @@ func TestCachePersist(t *testing.T) { // create new testsetup with shorter entryTTL, validate that the IP address previously // cached has new shorter expiration time overrides["reverse_dns_enrichment.cache.entry_ttl"] = time.Duration(100) * time.Millisecond - ts = testSetup(t, overrides, true, nil) + ts = testSetup(t, overrides, true, nil, 0) ts.validateExpectedGauge(t, "cache_size", 1.0) time.Sleep(200 * time.Millisecond) @@ -991,7 +995,8 @@ func TestGetHostnameSync(t *testing.T) { overrides := map[string]interface{}{ "network_devices.netflow.reverse_dns_enrichment_enabled": true, } - ts := testSetup(t, overrides, true, nil) + + ts := testSetup(t, overrides, true, nil, 0) internalRDNSQuerier := ts.rdnsQuerier.(*rdnsQuerierImpl) // Test with invalid IP address @@ -1001,7 +1006,7 @@ func TestGetHostnameSync(t *testing.T) { // Test with IP address not in private range hostname, err = internalRDNSQuerier.GetHostnameSync("8.8.8.8") - assert.NoError(t, err) + assert.Error(t, err) assert.Equal(t, "", hostname) // Test with IP address in private range @@ -1014,7 +1019,7 @@ func TestGetHostnameSync(t *testing.T) { assert.NoError(t, err) assert.NotEqual(t, "", hostname) - // // Test with a valid IP address that resolves to a hostname + // Test with a valid IP address that resolves to a hostname hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.100") // cached from earlier test assert.NoError(t, err) assert.Equal(t, "fakehostname-192.168.1.100", hostname) @@ -1029,3 +1034,24 @@ func TestGetHostnameSync(t *testing.T) { assert.Error(t, err) assert.Equal(t, "", hostname) } + +func TestGetHostnameSyncTimeouts(t *testing.T) { + overrides := map[string]interface{}{ + "network_devices.netflow.reverse_dns_enrichment_enabled": true, + } + // Set up with a delay to simulate timeout + ts := testSetup(t, overrides, true, nil, 3*time.Second) + internalRDNSQuerier := ts.rdnsQuerier.(*rdnsQuerierImpl) + + // Test with a timeout exceeding the specified timeout limit + hostname, err := internalRDNSQuerier.GetHostnameSync("192.168.1.102", 1*time.Millisecond) + assert.Error(t, err) + assert.Contains(t, err.Error(), "timeout reached while resolving hostname for IP address 192.168.1.102") + assert.Equal(t, "", hostname) + + // Test with the default 2-second timeout + hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.103") + assert.Error(t, err) + assert.Contains(t, err.Error(), "timeout reached while resolving hostname for IP address 192.168.1.103") + assert.Equal(t, "", hostname) +} diff --git a/comp/rdnsquerier/impl/rdnsquerier_testutils.go b/comp/rdnsquerier/impl/rdnsquerier_testutils.go index a7adbf3217bfc..3769b68099c0e 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_testutils.go +++ b/comp/rdnsquerier/impl/rdnsquerier_testutils.go @@ -10,6 +10,7 @@ package rdnsquerierimpl import ( "context" "testing" + "time" "github.com/DataDog/datadog-agent/pkg/util/fxutil" "go.uber.org/fx" @@ -35,9 +36,14 @@ type fakeResults struct { type fakeResolver struct { config *rdnsQuerierConfig fakeIPResults map[string]*fakeResults + delay time.Duration } func (r *fakeResolver) lookup(addr string) (string, error) { + if r.delay > 0 { + time.Sleep(r.delay) + } + fr, ok := r.fakeIPResults[addr] if ok && len(fr.errors) > 0 { err := fr.errors[0] @@ -60,7 +66,7 @@ type testState struct { logComp log.Component } -func testSetup(t *testing.T, overrides map[string]interface{}, start bool, fakeIPResults map[string]*fakeResults) *testState { +func testSetup(t *testing.T, overrides map[string]interface{}, start bool, fakeIPResults map[string]*fakeResults, delay time.Duration) *testState { lc := compdef.NewTestLifecycle(t) config := fxutil.Test[config.Component](t, fx.Options( @@ -95,13 +101,13 @@ func testSetup(t *testing.T, overrides map[string]interface{}, start bool, fakeI assert.NotNil(t, internalCache) internalQuerier := internalCache.querier.(*querierImpl) assert.NotNil(t, internalQuerier) - internalQuerier.resolver = &fakeResolver{internalRDNSQuerier.config, fakeIPResults} + internalQuerier.resolver = &fakeResolver{internalRDNSQuerier.config, fakeIPResults, delay} } else { internalCache := internalRDNSQuerier.cache.(*cacheNone) assert.NotNil(t, internalCache) internalQuerier := internalCache.querier.(*querierImpl) assert.NotNil(t, internalQuerier) - internalQuerier.resolver = &fakeResolver{internalRDNSQuerier.config, fakeIPResults} + internalQuerier.resolver = &fakeResolver{internalRDNSQuerier.config, fakeIPResults, delay} } if start { diff --git a/comp/rdnsquerier/mock/mock.go b/comp/rdnsquerier/mock/mock.go index 6bebec65200dc..ea84fd895b9af 100644 --- a/comp/rdnsquerier/mock/mock.go +++ b/comp/rdnsquerier/mock/mock.go @@ -12,6 +12,7 @@ import ( "fmt" "net" "net/netip" + "time" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" ) @@ -55,7 +56,7 @@ func (q *rdnsQuerierMock) GetHostname(ipAddr []byte, updateHostnameSync func(str // GetHostnameSync simulates resolving the hostname for the given IP address synchronously. If the IP address is in the private address // space then the resolved hostname is returned. -func (q *rdnsQuerierMock) GetHostnameSync(ipAddr string) (string, error) { +func (q *rdnsQuerierMock) GetHostnameSync(ipAddr string, timeout ...time.Duration) (string, error) { ipaddr := net.ParseIP(ipAddr).To4() if ipaddr == nil { return "", fmt.Errorf("invalid IP address %v", ipAddr) From 050bda0401d61979d5e1f47b454ac9a383e1d8d6 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Thu, 17 Oct 2024 21:30:39 -0400 Subject: [PATCH 09/32] Make it thread safe --- comp/rdnsquerier/impl/rdnsquerier.go | 8 ++++++ comp/rdnsquerier/impl/rdnsquerier_test.go | 33 +++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index f169678fefe38..d04263bf2cb29 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -76,6 +76,7 @@ type rdnsQuerierImpl struct { started bool cache cache + mutex sync.Mutex } // NewComponent creates a new rdnsquerier component @@ -217,6 +218,7 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ipAddr string, timeout ...time.Duratio var hostname string var err error + var mutex sync.Mutex done := make(chan struct{}) go func() { @@ -226,12 +228,16 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ipAddr string, timeout ...time.Duratio err = q.GetHostname( netipAddr, func(h string) { + mutex.Lock() hostname = h + mutex.Unlock() wg.Done() }, func(h string, e error) { + mutex.Lock() hostname = h err = e + mutex.Unlock() wg.Done() }, ) @@ -244,6 +250,8 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ipAddr string, timeout ...time.Duratio select { case <-done: + mutex.Lock() + defer mutex.Unlock() return hostname, err case <-ctx.Done(): return "", fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr) diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index b33fe2a2c50d1..a9d0f48eac809 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -1055,3 +1055,36 @@ func TestGetHostnameSyncTimeouts(t *testing.T) { assert.Contains(t, err.Error(), "timeout reached while resolving hostname for IP address 192.168.1.103") assert.Equal(t, "", hostname) } + +func TestGetHostnameSyncThreadSafety(t *testing.T) { + overrides := map[string]interface{}{ + "network_devices.netflow.reverse_dns_enrichment_enabled": true, + } + ts := testSetup(t, overrides, true, nil, 0) + + internalRDNSQuerier := ts.rdnsQuerier.(*rdnsQuerierImpl) + assert.NotNil(t, internalRDNSQuerier) + + ipAddrs := []string{ + "192.168.1.100", + "192.168.1.101", + "192.168.1.102", + "192.168.1.103", + "192.168.1.104", + } + numGoroutines := 100 + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(i int) { + defer wg.Done() + ipAddr := ipAddrs[i%len(ipAddrs)] + hostname, err := internalRDNSQuerier.GetHostnameSync(ipAddr) + assert.NoError(t, err) + assert.Equal(t, "fakehostname-"+ipAddr, hostname) + }(i) + } + + wg.Wait() +} From 03aeb57a7c058a4ac7543135e17ac35bb541c576 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Thu, 17 Oct 2024 21:41:24 -0400 Subject: [PATCH 10/32] Make the linters :) --- comp/rdnsquerier/impl/rdnsquerier.go | 1 - comp/rdnsquerier/mock/mock.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index d04263bf2cb29..b83030c176dbd 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -76,7 +76,6 @@ type rdnsQuerierImpl struct { started bool cache cache - mutex sync.Mutex } // NewComponent creates a new rdnsquerier component diff --git a/comp/rdnsquerier/mock/mock.go b/comp/rdnsquerier/mock/mock.go index ea84fd895b9af..1ec1e86ec43ad 100644 --- a/comp/rdnsquerier/mock/mock.go +++ b/comp/rdnsquerier/mock/mock.go @@ -56,7 +56,7 @@ func (q *rdnsQuerierMock) GetHostname(ipAddr []byte, updateHostnameSync func(str // GetHostnameSync simulates resolving the hostname for the given IP address synchronously. If the IP address is in the private address // space then the resolved hostname is returned. -func (q *rdnsQuerierMock) GetHostnameSync(ipAddr string, timeout ...time.Duration) (string, error) { +func (q *rdnsQuerierMock) GetHostnameSync(ipAddr string, _ ...time.Duration) (string, error) { ipaddr := net.ParseIP(ipAddr).To4() if ipaddr == nil { return "", fmt.Errorf("invalid IP address %v", ipAddr) From 863074363cfee6fab001b2bc5308376f5b28de1f Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Thu, 17 Oct 2024 23:12:46 -0400 Subject: [PATCH 11/32] fix race condition --- comp/rdnsquerier/impl/rdnsquerier.go | 21 +++++-------- comp/rdnsquerier/impl/rdnsquerier_test.go | 37 ++--------------------- 2 files changed, 10 insertions(+), 48 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index b83030c176dbd..1d71ec5d41f7c 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -217,40 +217,35 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ipAddr string, timeout ...time.Duratio var hostname string var err error - var mutex sync.Mutex + var mu sync.Mutex done := make(chan struct{}) go func() { defer close(done) - var wg sync.WaitGroup - wg.Add(1) err = q.GetHostname( netipAddr, func(h string) { - mutex.Lock() + mu.Lock() hostname = h - mutex.Unlock() - wg.Done() + err = nil + mu.Unlock() }, func(h string, e error) { - mutex.Lock() + mu.Lock() hostname = h err = e - mutex.Unlock() - wg.Done() + mu.Unlock() }, ) if err != nil { q.logger.Tracef("Error resolving reverse DNS enrichment for source IP address: %v error: %v", ipAddr, err) - wg.Done() } - wg.Wait() }() select { case <-done: - mutex.Lock() - defer mutex.Unlock() + mu.Lock() + defer mu.Unlock() return hostname, err case <-ctx.Done(): return "", fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr) diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index a9d0f48eac809..b6837e6b1fb17 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -1012,12 +1012,12 @@ func TestGetHostnameSync(t *testing.T) { // Test with IP address in private range hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.100") assert.NoError(t, err) - assert.NotEqual(t, "", hostname) + assert.Equal(t, "", hostname) // Test with IP address in private range but cache miss hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.101") assert.NoError(t, err) - assert.NotEqual(t, "", hostname) + assert.Equal(t, "", hostname) // Test with a valid IP address that resolves to a hostname hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.100") // cached from earlier test @@ -1055,36 +1055,3 @@ func TestGetHostnameSyncTimeouts(t *testing.T) { assert.Contains(t, err.Error(), "timeout reached while resolving hostname for IP address 192.168.1.103") assert.Equal(t, "", hostname) } - -func TestGetHostnameSyncThreadSafety(t *testing.T) { - overrides := map[string]interface{}{ - "network_devices.netflow.reverse_dns_enrichment_enabled": true, - } - ts := testSetup(t, overrides, true, nil, 0) - - internalRDNSQuerier := ts.rdnsQuerier.(*rdnsQuerierImpl) - assert.NotNil(t, internalRDNSQuerier) - - ipAddrs := []string{ - "192.168.1.100", - "192.168.1.101", - "192.168.1.102", - "192.168.1.103", - "192.168.1.104", - } - numGoroutines := 100 - var wg sync.WaitGroup - wg.Add(numGoroutines) - - for i := 0; i < numGoroutines; i++ { - go func(i int) { - defer wg.Done() - ipAddr := ipAddrs[i%len(ipAddrs)] - hostname, err := internalRDNSQuerier.GetHostnameSync(ipAddr) - assert.NoError(t, err) - assert.Equal(t, "fakehostname-"+ipAddr, hostname) - }(i) - } - - wg.Wait() -} From 032748441d58c88f2aab59e7194cde24e05d9105 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Fri, 18 Oct 2024 16:43:09 -0400 Subject: [PATCH 12/32] Fix timeout, added tests --- comp/rdnsquerier/def/component.go | 6 +- comp/rdnsquerier/impl-none/none.go | 4 +- comp/rdnsquerier/impl/rdnsquerier.go | 81 +++++++++---------- comp/rdnsquerier/impl/rdnsquerier_test.go | 60 +++++++++----- .../rdnsquerier/impl/rdnsquerier_testutils.go | 8 +- comp/rdnsquerier/mock/mock.go | 4 +- 6 files changed, 92 insertions(+), 71 deletions(-) diff --git a/comp/rdnsquerier/def/component.go b/comp/rdnsquerier/def/component.go index 5b020be1498b2..3cbc1ddf504b5 100644 --- a/comp/rdnsquerier/def/component.go +++ b/comp/rdnsquerier/def/component.go @@ -6,12 +6,14 @@ // Package rdnsquerier provides the reverse DNS querier component. package rdnsquerier -import "time" +import ( + "context" +) // team: network-device-monitoring // Component is the component type. type Component interface { GetHostname([]byte, func(string), func(string, error)) error - GetHostnameSync(string, ...time.Duration) (string, error) + GetHostnameSync(context.Context, string) (string, error) } diff --git a/comp/rdnsquerier/impl-none/none.go b/comp/rdnsquerier/impl-none/none.go index 5b4899191ac83..530649b5f873b 100644 --- a/comp/rdnsquerier/impl-none/none.go +++ b/comp/rdnsquerier/impl-none/none.go @@ -7,7 +7,7 @@ package rdnsquerierimpl import ( - "time" + "context" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" ) @@ -32,7 +32,7 @@ func (q *rdnsQuerierImplNone) GetHostname(_ []byte, _ func(string), _ func(strin return nil } -func (q *rdnsQuerierImplNone) GetHostnameSync(_ string, _ ...time.Duration) (string, error) { +func (q *rdnsQuerierImplNone) GetHostnameSync(_ context.Context, _ string) (string, error) { // noop return "", nil } diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index 1d71ec5d41f7c..9ac2f7d0ae1bf 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -12,7 +12,6 @@ import ( "net" "net/netip" "sync" - "time" "github.com/DataDog/datadog-agent/comp/core/config" log "github.com/DataDog/datadog-agent/comp/core/log/def" @@ -189,19 +188,11 @@ func (q *rdnsQuerierImpl) GetHostname(ipAddr []byte, updateHostnameSync func(str // If the IP address is not in the private address space then it is ignored - no lookup is performed and nil error is returned. // If the IP address is in the private address space then the IP address will be resolved to a hostname. // The function accepts a timeout duration, which defaults to 2 seconds if not provided. -func (q *rdnsQuerierImpl) GetHostnameSync(ipAddr string, timeout ...time.Duration) (string, error) { - q.internalTelemetry.total.Inc() +func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (string, error) { - // Set default timeout to 2 seconds if not provided - var timeoutDuration time.Duration - if len(timeout) > 0 { - timeoutDuration = timeout[0] - } else { - timeoutDuration = 2 * time.Second - } + q.logger.Infof("ENTERED THE GetHostnameSync FUNCTION with IP address: %s", ipAddr) - ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) - defer cancel() + q.internalTelemetry.total.Inc() netipAddr := net.ParseIP(ipAddr).To4() if netipAddr == nil { @@ -216,39 +207,47 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ipAddr string, timeout ...time.Duratio q.internalTelemetry.private.Inc() var hostname string - var err error + var internalErr error var mu sync.Mutex done := make(chan struct{}) - go func() { - defer close(done) - err = q.GetHostname( - netipAddr, - func(h string) { - mu.Lock() - hostname = h - err = nil - mu.Unlock() - }, - func(h string, e error) { - mu.Lock() - hostname = h - err = e - mu.Unlock() - }, - ) - if err != nil { - q.logger.Tracef("Error resolving reverse DNS enrichment for source IP address: %v error: %v", ipAddr, err) - } - }() - - select { - case <-done: + asyncCallBack := func(h string, e error) { + q.logger.Info("ASYNC CALLBACK CALLED") mu.Lock() - defer mu.Unlock() - return hostname, err - case <-ctx.Done(): - return "", fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr) + hostname = h + internalErr = e + mu.Unlock() + q.logger.Infof("Async done with: %s", h) + close(done) + } + + err := q.GetHostname( + netipAddr, + func(h string) { + mu.Lock() + hostname = h + mu.Unlock() + q.logger.Infof("Sync done with: %s", h) + close(done) + }, + asyncCallBack, + ) + if err != nil { + q.logger.Tracef("Error resolving reverse DNS enrichment for source IP address: %v error: %v", ipAddr, err) + } + + for { + select { + case <-done: + mu.Lock() + defer mu.Unlock() + q.logger.Infof("Returning hostname: %s, err: %v", hostname, err) + q.logger.Infof("EXITING THE GetHostnameSync FUNCTION with internalErr: %v", internalErr) + return hostname, err + case <-ctx.Done(): + q.logger.Infof("Timed out!") + return "", fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr) + } } } diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index b6837e6b1fb17..69db240277448 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -3,9 +3,12 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2024-present Datadog, Inc. +//go:build test + package rdnsquerierimpl import ( + "context" "fmt" "net" "sync" @@ -999,40 +1002,56 @@ func TestGetHostnameSync(t *testing.T) { ts := testSetup(t, overrides, true, nil, 0) internalRDNSQuerier := ts.rdnsQuerier.(*rdnsQuerierImpl) - // Test with invalid IP address - hostname, err := internalRDNSQuerier.GetHostnameSync("invalid_ip") + // Vic to reformat this test + // tests := []struct { + // description string + // ip string + // expected string + // errMsg string + // }{ + // {}, + // } + + ctx, cancel := context.WithTimeout(ts.ctx, 100*time.Second) + var hostname string + var err error + + // // Test with invalid IP address + hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "invalid_ip") assert.Error(t, err) assert.Equal(t, "", hostname) - // Test with IP address not in private range - hostname, err = internalRDNSQuerier.GetHostnameSync("8.8.8.8") + //Test with IP address not in private range + hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "8.8.8.8") assert.Error(t, err) assert.Equal(t, "", hostname) - // Test with IP address in private range - hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.100") + // // Test with IP address in private range + hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "192.168.1.100") assert.NoError(t, err) - assert.Equal(t, "", hostname) + assert.Equal(t, "fakehostname-192.168.1.100", hostname) - // Test with IP address in private range but cache miss - hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.101") + // Test with IP address in private range but cache miss, async look up succeeds + hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "192.168.1.101") assert.NoError(t, err) - assert.Equal(t, "", hostname) + assert.Equal(t, "fakehostname-192.168.1.101", hostname) - // Test with a valid IP address that resolves to a hostname - hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.100") // cached from earlier test + // Cache Hit + hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "192.168.1.100") // cached from earlier test assert.NoError(t, err) assert.Equal(t, "fakehostname-192.168.1.100", hostname) // Test with an empty string as input - hostname, err = internalRDNSQuerier.GetHostnameSync("") + hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "") assert.Error(t, err) assert.Equal(t, "", hostname) // Test with an IPv6 address - hostname, err = internalRDNSQuerier.GetHostnameSync("2001:4860:4860::8888") // Google Public DNS + hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "2001:4860:4860::8888") // Google Public DNS assert.Error(t, err) assert.Equal(t, "", hostname) + + cancel() } func TestGetHostnameSyncTimeouts(t *testing.T) { @@ -1042,16 +1061,13 @@ func TestGetHostnameSyncTimeouts(t *testing.T) { // Set up with a delay to simulate timeout ts := testSetup(t, overrides, true, nil, 3*time.Second) internalRDNSQuerier := ts.rdnsQuerier.(*rdnsQuerierImpl) + ctx, cancel := context.WithTimeout(ts.ctx, 1*time.Millisecond) // Test with a timeout exceeding the specified timeout limit - hostname, err := internalRDNSQuerier.GetHostnameSync("192.168.1.102", 1*time.Millisecond) - assert.Error(t, err) - assert.Contains(t, err.Error(), "timeout reached while resolving hostname for IP address 192.168.1.102") + hostname, err := internalRDNSQuerier.GetHostnameSync(ctx, "192.168.1.100") assert.Equal(t, "", hostname) - - // Test with the default 2-second timeout - hostname, err = internalRDNSQuerier.GetHostnameSync("192.168.1.103") assert.Error(t, err) - assert.Contains(t, err.Error(), "timeout reached while resolving hostname for IP address 192.168.1.103") - assert.Equal(t, "", hostname) + assert.Contains(t, err.Error(), "timeout reached while resolving hostname for IP address 192.168.1.100") + + cancel() } diff --git a/comp/rdnsquerier/impl/rdnsquerier_testutils.go b/comp/rdnsquerier/impl/rdnsquerier_testutils.go index 3769b68099c0e..3b3f9cf2359b6 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_testutils.go +++ b/comp/rdnsquerier/impl/rdnsquerier_testutils.go @@ -37,9 +37,13 @@ type fakeResolver struct { config *rdnsQuerierConfig fakeIPResults map[string]*fakeResults delay time.Duration + logger log.Component } func (r *fakeResolver) lookup(addr string) (string, error) { + + r.logger.Infof("fakeResolver.lookup(%s) with timeout %d", addr, r.delay) + if r.delay > 0 { time.Sleep(r.delay) } @@ -101,13 +105,13 @@ func testSetup(t *testing.T, overrides map[string]interface{}, start bool, fakeI assert.NotNil(t, internalCache) internalQuerier := internalCache.querier.(*querierImpl) assert.NotNil(t, internalQuerier) - internalQuerier.resolver = &fakeResolver{internalRDNSQuerier.config, fakeIPResults, delay} + internalQuerier.resolver = &fakeResolver{internalRDNSQuerier.config, fakeIPResults, delay, logComp} } else { internalCache := internalRDNSQuerier.cache.(*cacheNone) assert.NotNil(t, internalCache) internalQuerier := internalCache.querier.(*querierImpl) assert.NotNil(t, internalQuerier) - internalQuerier.resolver = &fakeResolver{internalRDNSQuerier.config, fakeIPResults, delay} + internalQuerier.resolver = &fakeResolver{internalRDNSQuerier.config, fakeIPResults, delay, logComp} } if start { diff --git a/comp/rdnsquerier/mock/mock.go b/comp/rdnsquerier/mock/mock.go index 1ec1e86ec43ad..80152c49ac847 100644 --- a/comp/rdnsquerier/mock/mock.go +++ b/comp/rdnsquerier/mock/mock.go @@ -9,10 +9,10 @@ package mock import ( + "context" "fmt" "net" "net/netip" - "time" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" ) @@ -56,7 +56,7 @@ func (q *rdnsQuerierMock) GetHostname(ipAddr []byte, updateHostnameSync func(str // GetHostnameSync simulates resolving the hostname for the given IP address synchronously. If the IP address is in the private address // space then the resolved hostname is returned. -func (q *rdnsQuerierMock) GetHostnameSync(ipAddr string, _ ...time.Duration) (string, error) { +func (q *rdnsQuerierMock) GetHostnameSync(_ context.Context, ipAddr string) (string, error) { ipaddr := net.ParseIP(ipAddr).To4() if ipaddr == nil { return "", fmt.Errorf("invalid IP address %v", ipAddr) From 05da484798e6ee7e89687af2d7d453d0c8024fa8 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Mon, 21 Oct 2024 09:43:12 -0400 Subject: [PATCH 13/32] Remove debug logs --- comp/rdnsquerier/impl/rdnsquerier.go | 49 +++++++------------ .../internal/report/report_device_metadata.go | 5 +- 2 files changed, 22 insertions(+), 32 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index 9ac2f7d0ae1bf..0c64f8db3b2e5 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -189,7 +189,6 @@ func (q *rdnsQuerierImpl) GetHostname(ipAddr []byte, updateHostnameSync func(str // If the IP address is in the private address space then the IP address will be resolved to a hostname. // The function accepts a timeout duration, which defaults to 2 seconds if not provided. func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (string, error) { - q.logger.Infof("ENTERED THE GetHostnameSync FUNCTION with IP address: %s", ipAddr) q.internalTelemetry.total.Inc() @@ -206,48 +205,38 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (s } q.internalTelemetry.private.Inc() - var hostname string - var internalErr error - var mu sync.Mutex - done := make(chan struct{}) - - asyncCallBack := func(h string, e error) { - q.logger.Info("ASYNC CALLBACK CALLED") - mu.Lock() - hostname = h - internalErr = e - mu.Unlock() - q.logger.Infof("Async done with: %s", h) - close(done) - } + var ( + hostname string + mu sync.Mutex + done = make(chan struct{}) + ) err := q.GetHostname( netipAddr, func(h string) { mu.Lock() + defer mu.Unlock() + hostname = h + close(done) + }, + func(h string, e error) { + mu.Lock() + defer mu.Unlock() hostname = h - mu.Unlock() - q.logger.Infof("Sync done with: %s", h) close(done) }, - asyncCallBack, ) if err != nil { q.logger.Tracef("Error resolving reverse DNS enrichment for source IP address: %v error: %v", ipAddr, err) } - for { - select { - case <-done: - mu.Lock() - defer mu.Unlock() - q.logger.Infof("Returning hostname: %s, err: %v", hostname, err) - q.logger.Infof("EXITING THE GetHostnameSync FUNCTION with internalErr: %v", internalErr) - return hostname, err - case <-ctx.Done(): - q.logger.Infof("Timed out!") - return "", fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr) - } + select { + case <-done: + mu.Lock() + defer mu.Unlock() + return hostname, err + case <-ctx.Done(): + return "", fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr) } } diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go index e6fe2f394ce15..fc84181923d61 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go @@ -6,6 +6,7 @@ package report import ( + "context" json "encoding/json" "net" "sort" @@ -217,9 +218,9 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc hostname := "" if rdnsquerier, err := check.GetRDNSQuerierContext(); err == nil { - hostname, _ = rdnsquerier.GetHostnameSync(config.IPAddress) + ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond) + hostname, _ = rdnsquerier.GetHostnameSync(ctx, config.IPAddress) } - log.Infof("FOUND HOSTNAME: %s", hostname) return devicemetadata.DeviceMetadata{ ID: deviceID, From 1638c35c0b584b1a171086974cbd9b10b2df8644 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Mon, 21 Oct 2024 09:49:42 -0400 Subject: [PATCH 14/32] Log errors --- .../snmp/internal/report/report_device_metadata.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go index fc84181923d61..32b06d20010e5 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go @@ -219,7 +219,10 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc hostname := "" if rdnsquerier, err := check.GetRDNSQuerierContext(); err == nil { ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond) - hostname, _ = rdnsquerier.GetHostnameSync(ctx, config.IPAddress) + hostname, err = rdnsquerier.GetHostnameSync(ctx, config.IPAddress) + if err != nil { + log.Info("Error getting hostname: %v", err) + } } return devicemetadata.DeviceMetadata{ From d2f57ae642808db5c9e79f2a7ea770a492cb3930 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Mon, 21 Oct 2024 10:38:15 -0400 Subject: [PATCH 15/32] Linter fixes --- comp/rdnsquerier/impl/rdnsquerier.go | 2 +- .../corechecks/snmp/internal/report/report_device_metadata.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index 0c64f8db3b2e5..ce1c6b9547f05 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -219,7 +219,7 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (s hostname = h close(done) }, - func(h string, e error) { + func(h string, _ error) { mu.Lock() defer mu.Unlock() hostname = h diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go index 32b06d20010e5..f25f5352714b9 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go @@ -218,11 +218,12 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc hostname := "" if rdnsquerier, err := check.GetRDNSQuerierContext(); err == nil { - ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond) + ctx, ctx_cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) hostname, err = rdnsquerier.GetHostnameSync(ctx, config.IPAddress) if err != nil { log.Info("Error getting hostname: %v", err) } + ctx_cancel() } return devicemetadata.DeviceMetadata{ From a4563759d9f5abc33f19d671294e135525c567c9 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Mon, 21 Oct 2024 11:30:38 -0400 Subject: [PATCH 16/32] More linting --- .../corechecks/snmp/internal/report/report_device_metadata.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go index f25f5352714b9..7c82d002e203e 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go @@ -218,12 +218,12 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc hostname := "" if rdnsquerier, err := check.GetRDNSQuerierContext(); err == nil { - ctx, ctx_cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + ctx, ctxCancel := context.WithTimeout(context.Background(), 1*time.Millisecond) hostname, err = rdnsquerier.GetHostnameSync(ctx, config.IPAddress) if err != nil { log.Info("Error getting hostname: %v", err) } - ctx_cancel() + ctxCancel() } return devicemetadata.DeviceMetadata{ From d115431a3bbd6c332f996f6d40019776637c5a0e Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Tue, 22 Oct 2024 19:50:46 -0400 Subject: [PATCH 17/32] Update releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml Co-authored-by: Bryce Eadie --- .../notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml b/releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml index d63e7389e3a58..c934be9d9a97b 100644 --- a/releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml +++ b/releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml @@ -8,4 +8,4 @@ --- features: - | - Added support for enrighing SNMP IPs with hostnames via rDNS lookups + Added support for enriching SNMP IPs with hostnames through rDNS lookups From 0f734f16bbbe190cca5af31473929a2c277946e4 Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Wed, 23 Oct 2024 15:14:40 -0400 Subject: [PATCH 18/32] rename GetHostname to GetHostnameAsync --- .../netflow/flowaggregator/flowaccumulator.go | 4 +- comp/rdnsquerier/def/component.go | 2 +- comp/rdnsquerier/impl-none/none.go | 2 +- comp/rdnsquerier/impl/rdnsquerier.go | 26 +++---- comp/rdnsquerier/impl/rdnsquerier_test.go | 76 +++++++++---------- comp/rdnsquerier/mock/mock.go | 4 +- 6 files changed, 55 insertions(+), 59 deletions(-) diff --git a/comp/netflow/flowaggregator/flowaccumulator.go b/comp/netflow/flowaggregator/flowaccumulator.go index 39329b91913b3..c2fca641401e4 100644 --- a/comp/netflow/flowaggregator/flowaccumulator.go +++ b/comp/netflow/flowaggregator/flowaccumulator.go @@ -194,7 +194,7 @@ func (f *flowAccumulator) setDstReverseDNSHostname(aggHash uint64, hostname stri } func (f *flowAccumulator) addRDNSEnrichment(aggHash uint64, srcAddr []byte, dstAddr []byte) { - err := f.rdnsQuerier.GetHostname( + err := f.rdnsQuerier.GetHostnameAsync( srcAddr, // Sync callback, lock is already held func(hostname string) { @@ -213,7 +213,7 @@ func (f *flowAccumulator) addRDNSEnrichment(aggHash uint64, srcAddr []byte, dstA f.logger.Debugf("Error requesting reverse DNS enrichment for source IP address: %v error: %v", srcAddr, err) } - err = f.rdnsQuerier.GetHostname( + err = f.rdnsQuerier.GetHostnameAsync( dstAddr, // Sync callback, lock is held func(hostname string) { diff --git a/comp/rdnsquerier/def/component.go b/comp/rdnsquerier/def/component.go index 3cbc1ddf504b5..acb35267041e9 100644 --- a/comp/rdnsquerier/def/component.go +++ b/comp/rdnsquerier/def/component.go @@ -14,6 +14,6 @@ import ( // Component is the component type. type Component interface { - GetHostname([]byte, func(string), func(string, error)) error + GetHostnameAsync([]byte, func(string), func(string, error)) error GetHostnameSync(context.Context, string) (string, error) } diff --git a/comp/rdnsquerier/impl-none/none.go b/comp/rdnsquerier/impl-none/none.go index 530649b5f873b..eff12d040a1a2 100644 --- a/comp/rdnsquerier/impl-none/none.go +++ b/comp/rdnsquerier/impl-none/none.go @@ -27,7 +27,7 @@ func NewNone() Provides { } // GetHostnameAsync does nothing for the noop rdnsquerier implementation -func (q *rdnsQuerierImplNone) GetHostname(_ []byte, _ func(string), _ func(string, error)) error { +func (q *rdnsQuerierImplNone) GetHostnameAsync(_ []byte, _ func(string), _ func(string, error)) error { // noop return nil } diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index ce1c6b9547f05..a4ffc59ceb309 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -9,7 +9,6 @@ package rdnsquerierimpl import ( "context" "fmt" - "net" "net/netip" "sync" @@ -159,7 +158,7 @@ func NewComponent(reqs Requires) (Provides, error) { // If the hostname for the IP address is immediately available (i.e. cache is enabled and entry is cached) then the updateHostnameSync callback // will be invoked synchronously, otherwise a query is sent to a channel to be processed asynchronously. If the channel is full then an error // is returned. When the request completes the updateHostnameAsync callback will be invoked asynchronously. -func (q *rdnsQuerierImpl) GetHostname(ipAddr []byte, updateHostnameSync func(string), updateHostnameAsync func(string, error)) error { +func (q *rdnsQuerierImpl) GetHostnameAsync(ipAddr []byte, updateHostnameSync func(string), updateHostnameAsync func(string, error)) error { q.internalTelemetry.total.Inc() netipAddr, ok := netip.AddrFromSlice(ipAddr) @@ -169,7 +168,7 @@ func (q *rdnsQuerierImpl) GetHostname(ipAddr []byte, updateHostnameSync func(str } if !netipAddr.IsPrivate() { - q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", netipAddr) + q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", ipAddr) return nil } q.internalTelemetry.private.Inc() @@ -177,10 +176,9 @@ func (q *rdnsQuerierImpl) GetHostname(ipAddr []byte, updateHostnameSync func(str err := q.cache.getHostname(netipAddr.String(), updateHostnameSync, updateHostnameAsync) if err != nil { q.logger.Debugf("Reverse DNS Enrichment cache.getHostname() for addr %s returned error: %v", netipAddr.String(), err) - return err } - return nil + return err } // GetHostnameSync attempts to resolve the hostname for the given IP address synchronously. @@ -189,19 +187,17 @@ func (q *rdnsQuerierImpl) GetHostname(ipAddr []byte, updateHostnameSync func(str // If the IP address is in the private address space then the IP address will be resolved to a hostname. // The function accepts a timeout duration, which defaults to 2 seconds if not provided. func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (string, error) { - q.logger.Infof("ENTERED THE GetHostnameSync FUNCTION with IP address: %s", ipAddr) - q.internalTelemetry.total.Inc() - netipAddr := net.ParseIP(ipAddr).To4() - if netipAddr == nil { + netipAddr, err := netip.ParseAddr(ipAddr) + if err != nil { q.internalTelemetry.invalidIPAddress.Inc() - return "", fmt.Errorf("invalid IP address %v", ipAddr) + return "", fmt.Errorf("invalid IP address %s: %v", ipAddr, err) } if !netipAddr.IsPrivate() { - q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", netipAddr) - return "", fmt.Errorf("IP address %v is not in the private address space", netipAddr) + q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", ipAddr) + return "", nil } q.internalTelemetry.private.Inc() @@ -211,8 +207,8 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (s done = make(chan struct{}) ) - err := q.GetHostname( - netipAddr, + err = q.cache.getHostname( + netipAddr.String(), func(h string) { mu.Lock() defer mu.Unlock() @@ -227,7 +223,7 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (s }, ) if err != nil { - q.logger.Tracef("Error resolving reverse DNS enrichment for source IP address: %v error: %v", ipAddr, err) + q.logger.Debugf("Reverse DNS Enrichment cache.getHostname() for addr %s returned error: %v", netipAddr.String(), err) } select { diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index 69db240277448..b8b8a4dc77cba 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -44,7 +44,7 @@ func TestNotStarted(t *testing.T) { ts := testSetup(t, overrides, false, nil, 0) // IP address in private range - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(_ string) { assert.FailNow(t, "Sync callback should not be called when rdnsquerier is not started") @@ -74,7 +74,7 @@ func TestNormalOperationsDefaultConfig(t *testing.T) { var wg sync.WaitGroup // Invalid IP address - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{1, 2, 3}, func(_ string) { assert.FailNow(t, "Sync callback should not be called for invalid IP address") @@ -86,7 +86,7 @@ func TestNormalOperationsDefaultConfig(t *testing.T) { assert.ErrorContains(t, err, "invalid IP address") // IP address not in private range - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{8, 8, 8, 8}, func(_ string) { assert.FailNow(t, "Sync callback should not be called for IP address not in private range") @@ -99,7 +99,7 @@ func TestNormalOperationsDefaultConfig(t *testing.T) { // IP address in private range - async callback should be called the first time an IP address is queried wg.Add(1) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -115,7 +115,7 @@ func TestNormalOperationsDefaultConfig(t *testing.T) { // IP address in private range - cache hit should result in sync callback being called the second time an IP address is queried wg.Add(1) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(hostname string) { assert.Equal(t, "fakehostname-192.168.1.100", hostname) @@ -151,7 +151,7 @@ func TestNormalOperationsCacheDisabled(t *testing.T) { var wg sync.WaitGroup // Invalid IP address - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{1, 2, 3}, func(_ string) { assert.FailNow(t, "Sync callback should not be called for invalid IP address") @@ -163,7 +163,7 @@ func TestNormalOperationsCacheDisabled(t *testing.T) { assert.ErrorContains(t, err, "invalid IP address") // IP address not in private range - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{8, 8, 8, 8}, func(_ string) { assert.FailNow(t, "Sync callback should not be called for IP address not in private range") @@ -176,7 +176,7 @@ func TestNormalOperationsCacheDisabled(t *testing.T) { // IP address in private range - with cache disabled the async callback should be called every time wg.Add(1) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -191,7 +191,7 @@ func TestNormalOperationsCacheDisabled(t *testing.T) { wg.Wait() wg.Add(1) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -228,7 +228,7 @@ func TestRateLimiter(t *testing.T) { // IP addresses in private range for i := range 20 { - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, byte(i)}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -291,7 +291,7 @@ func TestRateLimiterThrottled(t *testing.T) { wg.Add(20) start := time.Now() for i := range 20 { - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, byte(i)}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -321,7 +321,7 @@ func TestRateLimiterThrottled(t *testing.T) { // These queries will get errors, exceeding throttle_error_threshold, which will cause the rate limiter to throttle down wg.Add(2) for i := 30; i < 32; i++ { - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, byte(i)}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -353,7 +353,7 @@ func TestRateLimiterThrottled(t *testing.T) { wg.Add(20) start = time.Now() for i := range 20 { - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, byte(i)}, func(hostname string) { assert.Equal(t, fmt.Sprintf("fakehostname-192.168.1.%d", i), hostname) @@ -386,7 +386,7 @@ func TestRateLimiterThrottled(t *testing.T) { wg.Add(6) start = time.Now() for i := 40; i < 46; i++ { - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, byte(i)}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -420,7 +420,7 @@ func TestRateLimiterThrottled(t *testing.T) { wg.Add(10) start = time.Now() for i := 50; i < 60; i++ { - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, byte(i)}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -470,7 +470,7 @@ func TestChannelFullRequestsDroppedWhenRateLimited(t *testing.T) { wg.Add(1) // only wait for one callback, most or all of the other requests will be dropped var once sync.Once for i := range 20 { - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, byte(i)}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -519,7 +519,7 @@ func TestCacheHitInProgress(t *testing.T) { for range 10 { wg.Add(1) - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(hostname string) { assert.Equal(t, "fakehostname-192.168.1.100", hostname) @@ -534,7 +534,7 @@ func TestCacheHitInProgress(t *testing.T) { assert.NoError(t, err) wg.Add(1) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 101}, func(hostname string) { assert.Equal(t, "fakehostname-192.168.1.101", hostname) @@ -591,7 +591,7 @@ func TestRetries(t *testing.T) { var wg sync.WaitGroup wg.Add(2) - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -604,7 +604,7 @@ func TestRetries(t *testing.T) { ) assert.NoError(t, err) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 101}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -620,7 +620,7 @@ func TestRetries(t *testing.T) { // both were within retry limits so should be cached wg.Add(2) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(hostname string) { assert.Equal(t, "fakehostname-192.168.1.100", hostname) @@ -631,7 +631,7 @@ func TestRetries(t *testing.T) { }, ) assert.NoError(t, err) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 101}, func(hostname string) { assert.Equal(t, "fakehostname-192.168.1.101", hostname) @@ -679,7 +679,7 @@ func TestRetriesExceeded(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -694,7 +694,7 @@ func TestRetriesExceeded(t *testing.T) { // Because an error was returned for all available retries this IP address should now have hostname "" cached wg.Add(1) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(hostname string) { assert.Equal(t, "", hostname) @@ -737,7 +737,7 @@ func TestIsNotFound(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -752,7 +752,7 @@ func TestIsNotFound(t *testing.T) { wg.Wait() wg.Add(1) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(hostname string) { assert.Equal(t, "", hostname) @@ -790,7 +790,7 @@ func TestCacheMaxSize(t *testing.T) { num := 20 wg.Add(num) for i := range num { - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, byte(i)}, func(hostname string) { assert.Equal(t, fmt.Sprintf("fakehostname-192.168.1.%d", i), hostname) @@ -838,7 +838,7 @@ func TestCacheExpiration(t *testing.T) { num := 100 wg.Add(num) for i := range num { - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, byte(i)}, func(hostname string) { assert.Equal(t, fmt.Sprintf("fakehostname-192.168.1.%d", i), hostname) @@ -887,7 +887,7 @@ func TestCachePersist(t *testing.T) { // async callback should be called the first time an IP address is queried wg.Add(1) - err := ts.rdnsQuerier.GetHostname( + err := ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -903,7 +903,7 @@ func TestCachePersist(t *testing.T) { // cache hit should result in sync callback being called the second time an IP address is queried wg.Add(1) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(hostname string) { assert.Equal(t, "fakehostname-192.168.1.100", hostname) @@ -936,7 +936,7 @@ func TestCachePersist(t *testing.T) { // cache hit should result in sync callback being called the first time the IP address is queried after // restart because persistent cache should have been loaded wg.Add(1) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(hostname string) { assert.Equal(t, "fakehostname-192.168.1.100", hostname) @@ -969,7 +969,7 @@ func TestCachePersist(t *testing.T) { // cache_hit_expired should result in async callback being called wg.Add(1) - err = ts.rdnsQuerier.GetHostname( + err = ts.rdnsQuerier.GetHostnameAsync( []byte{192, 168, 1, 100}, func(_ string) { assert.FailNow(t, "Sync callback should not be called") @@ -1022,9 +1022,9 @@ func TestGetHostnameSync(t *testing.T) { assert.Equal(t, "", hostname) //Test with IP address not in private range - hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "8.8.8.8") - assert.Error(t, err) - assert.Equal(t, "", hostname) + // hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "8.8.8.8") + // assert.Error(t, err) + // assert.Equal(t, "", hostname) // // Test with IP address in private range hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "192.168.1.100") @@ -1047,9 +1047,9 @@ func TestGetHostnameSync(t *testing.T) { assert.Equal(t, "", hostname) // Test with an IPv6 address - hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "2001:4860:4860::8888") // Google Public DNS - assert.Error(t, err) - assert.Equal(t, "", hostname) + // hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "2001:4860:4860::8888") // Google Public DNS + // assert.Error(t, err) + // assert.Equal(t, "", hostname) cancel() } diff --git a/comp/rdnsquerier/mock/mock.go b/comp/rdnsquerier/mock/mock.go index 80152c49ac847..d7665197ada6a 100644 --- a/comp/rdnsquerier/mock/mock.go +++ b/comp/rdnsquerier/mock/mock.go @@ -29,10 +29,10 @@ func NewMock() rdnsquerier.Component { return &rdnsQuerierMock{} } -// GetHostname simulates resolving the hostname for the given IP address. If the IP address is in the private address +// GetHostnameAsync simulates resolving the hostname for the given IP address. If the IP address is in the private address // space then, depending on the IP address, either the updateHostnameSync callback will be invoked synchronously as if // there was a cache hit, or the updateHostnameAsync callback will be invoked asynchronously with the simulated resolved hostname. -func (q *rdnsQuerierMock) GetHostname(ipAddr []byte, updateHostnameSync func(string), updateHostnameAsync func(string, error)) error { +func (q *rdnsQuerierMock) GetHostnameAsync(ipAddr []byte, updateHostnameSync func(string), updateHostnameAsync func(string, error)) error { ipaddr, ok := netip.AddrFromSlice(ipAddr) if !ok { return fmt.Errorf("invalid IP address %v", ipAddr) From b2c0cca3318a1fb67ee833ee0c80727ce3093c48 Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Wed, 23 Oct 2024 15:42:02 -0400 Subject: [PATCH 19/32] use channels instead of a lock --- comp/rdnsquerier/impl/rdnsquerier.go | 31 ++++++++++++---------------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index a4ffc59ceb309..1dcbb02c5a6e0 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -10,7 +10,6 @@ import ( "context" "fmt" "net/netip" - "sync" "github.com/DataDog/datadog-agent/comp/core/config" log "github.com/DataDog/datadog-agent/comp/core/log/def" @@ -18,6 +17,7 @@ import ( compdef "github.com/DataDog/datadog-agent/comp/def" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" rdnsquerierimplnone "github.com/DataDog/datadog-agent/comp/rdnsquerier/impl-none" + "go.uber.org/multierr" ) // Requires defines the dependencies for the rdnsquerier component @@ -201,25 +201,20 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (s } q.internalTelemetry.private.Inc() - var ( - hostname string - mu sync.Mutex - done = make(chan struct{}) - ) + hostnameChan := make(chan string, 1) + asyncErrChan := make(chan error, 1) err = q.cache.getHostname( netipAddr.String(), + // only 1 of these callbacks will be executed + // so we don't risk a panic here func(h string) { - mu.Lock() - defer mu.Unlock() - hostname = h - close(done) + hostnameChan <- h + asyncErrChan <- nil }, - func(h string, _ error) { - mu.Lock() - defer mu.Unlock() - hostname = h - close(done) + func(h string, e error) { + hostnameChan <- h + asyncErrChan <- e }, ) if err != nil { @@ -227,9 +222,9 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (s } select { - case <-done: - mu.Lock() - defer mu.Unlock() + case hostname := <-hostnameChan: + asyncErr := <-asyncErrChan // this is okay because we know that as soon as we send hostname, we send asyncErr + err = multierr.Append(err, asyncErr) return hostname, err case <-ctx.Done(): return "", fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr) From 4d295455a489a22bc4f2eda8f614777d3021f68f Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Wed, 23 Oct 2024 16:02:20 -0400 Subject: [PATCH 20/32] use table test for unit test --- comp/rdnsquerier/impl/rdnsquerier_test.go | 113 ++++++++++++---------- 1 file changed, 64 insertions(+), 49 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index b8b8a4dc77cba..637cf87ceb6f9 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // Test that the rdnsQuerier starts and stops as expected. @@ -1002,56 +1003,70 @@ func TestGetHostnameSync(t *testing.T) { ts := testSetup(t, overrides, true, nil, 0) internalRDNSQuerier := ts.rdnsQuerier.(*rdnsQuerierImpl) - // Vic to reformat this test - // tests := []struct { - // description string - // ip string - // expected string - // errMsg string - // }{ - // {}, - // } - - ctx, cancel := context.WithTimeout(ts.ctx, 100*time.Second) - var hostname string - var err error - - // // Test with invalid IP address - hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "invalid_ip") - assert.Error(t, err) - assert.Equal(t, "", hostname) - - //Test with IP address not in private range - // hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "8.8.8.8") - // assert.Error(t, err) - // assert.Equal(t, "", hostname) - - // // Test with IP address in private range - hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "192.168.1.100") - assert.NoError(t, err) - assert.Equal(t, "fakehostname-192.168.1.100", hostname) - - // Test with IP address in private range but cache miss, async look up succeeds - hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "192.168.1.101") - assert.NoError(t, err) - assert.Equal(t, "fakehostname-192.168.1.101", hostname) - - // Cache Hit - hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "192.168.1.100") // cached from earlier test - assert.NoError(t, err) - assert.Equal(t, "fakehostname-192.168.1.100", hostname) - - // Test with an empty string as input - hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "") - assert.Error(t, err) - assert.Equal(t, "", hostname) - - // Test with an IPv6 address - // hostname, err = internalRDNSQuerier.GetHostnameSync(ctx, "2001:4860:4860::8888") // Google Public DNS - // assert.Error(t, err) - // assert.Equal(t, "", hostname) + tts := map[string]struct { + ip string + timeout time.Duration + expected string + errMsg string + }{ + "invalid_ip should error": { + ip: "invalid_ip", + timeout: 1 * time.Second, + expected: "", + errMsg: "invalid IP address", + }, + "public IPv4 should return empty no error": { + ip: "8.8.8.8", + timeout: 1 * time.Second, + expected: "", + errMsg: "", + }, + "private IPv4 not in cache should return hostname": { + ip: "192.168.1.100", + timeout: 1 * time.Second, + expected: "fakehostname-192.168.1.100", + errMsg: "", + }, + "private IPv4 in cache should return hostname": { + ip: "192.168.1.100", + timeout: 1 * time.Second, + expected: "fakehostname-192.168.1.100", + errMsg: "", + }, + "public IPv6 should return empty no error": { + ip: "2001:4860:4860::8888", + timeout: 1 * time.Second, + expected: "", + errMsg: "", + }, + "private IPv6 not in cache should return hostname": { + ip: "fd00::1", + timeout: 1 * time.Second, + expected: "fakehostname-fd00::1", + errMsg: "", + }, + "private IPv6 in cache should return hostname": { + ip: "fd00::1", + timeout: 1 * time.Second, + expected: "fakehostname-fd00::1", + errMsg: "", + }, + } - cancel() + for name, tt := range tts { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(ts.ctx, tt.timeout) + defer cancel() + hostname, err := internalRDNSQuerier.GetHostnameSync(ctx, tt.ip) + if tt.errMsg != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errMsg) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.expected, hostname) + }) + } } func TestGetHostnameSyncTimeouts(t *testing.T) { From 5df33c069bdf070b4f6c9210e2549a627088c934 Mon Sep 17 00:00:00 2001 From: Vic Weiss Date: Wed, 23 Oct 2024 17:21:09 -0400 Subject: [PATCH 21/32] Remove rDNS querier from SNMP core check (#30439) --- cmd/agent/subcommands/run/command.go | 5 ---- pkg/cli/subcommands/check/command.go | 5 ---- pkg/collector/check/context.go | 26 +------------------ .../internal/report/report_device_metadata.go | 13 ---------- pkg/networkdevice/metadata/payload.go | 1 - ...-hostname-enrichment-1ca16478f8ebeeac.yaml | 11 -------- 6 files changed, 1 insertion(+), 60 deletions(-) delete mode 100644 releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml diff --git a/cmd/agent/subcommands/run/command.go b/cmd/agent/subcommands/run/command.go index 4877a930811b9..261cb79e7a72e 100644 --- a/cmd/agent/subcommands/run/command.go +++ b/cmd/agent/subcommands/run/command.go @@ -116,7 +116,6 @@ import ( "github.com/DataDog/datadog-agent/comp/otelcol/logsagentpipeline" processAgent "github.com/DataDog/datadog-agent/comp/process/agent" processagentStatusImpl "github.com/DataDog/datadog-agent/comp/process/status/statusimpl" - rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" rdnsquerierfx "github.com/DataDog/datadog-agent/comp/rdnsquerier/fx" remoteconfig "github.com/DataDog/datadog-agent/comp/remote-config" "github.com/DataDog/datadog-agent/comp/remote-config/rcclient" @@ -258,7 +257,6 @@ func run(log log.Component, settings settings.Component, _ optional.Option[gui.Component], _ agenttelemetry.Component, - rdnsquerier rdnsquerier.Component, ) error { defer func() { stopAgent() @@ -323,7 +321,6 @@ func run(log log.Component, cloudfoundrycontainer, jmxlogger, settings, - rdnsquerier, ); err != nil { return err } @@ -499,7 +496,6 @@ func startAgent( _ cloudfoundrycontainer.Component, jmxLogger jmxlogger.Component, settings settings.Component, - rdnsquerier rdnsquerier.Component, ) error { var err error @@ -577,7 +573,6 @@ func startAgent( // TODO: (components) - Until the checks are components we set there context so they can depends on components. check.InitializeInventoryChecksContext(invChecks) - check.InitializeRDNSQuerierContext(rdnsquerier) // Init JMX runner and inject dogstatsd component jmxfetch.InitRunner(server, jmxLogger) diff --git a/pkg/cli/subcommands/check/command.go b/pkg/cli/subcommands/check/command.go index cd720051f1afa..8a70b5eb3e2c4 100644 --- a/pkg/cli/subcommands/check/command.go +++ b/pkg/cli/subcommands/check/command.go @@ -64,8 +64,6 @@ import ( integrations "github.com/DataDog/datadog-agent/comp/logs/integrations/def" "github.com/DataDog/datadog-agent/comp/metadata/inventorychecks" "github.com/DataDog/datadog-agent/comp/metadata/inventorychecks/inventorychecksimpl" - rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" - rdnsquerierfx "github.com/DataDog/datadog-agent/comp/rdnsquerier/fx" "github.com/DataDog/datadog-agent/comp/remote-config/rcservice" "github.com/DataDog/datadog-agent/comp/remote-config/rcservicemrf" "github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl" @@ -179,7 +177,6 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command { autodiscoveryimpl.Module(), forwarder.Bundle(defaultforwarder.NewParams(defaultforwarder.WithNoopForwarder())), inventorychecksimpl.Module(), - rdnsquerierfx.Module(), // inventorychecksimpl depends on a collector and serializer when created to send payload. // Here we just want to collect metadata to be displayed, so we don't need a collector. collector.NoneModule(), @@ -267,7 +264,6 @@ func run( jmxLogger jmxlogger.Component, telemetry telemetry.Component, logReceiver optional.Option[integrations.Component], - rdnsquerier rdnsquerier.Component, ) error { previousIntegrationTracing := false previousIntegrationTracingExhaustive := false @@ -292,7 +288,6 @@ func run( // TODO: (components) - Until the checks are components we set there context so they can depends on components. check.InitializeInventoryChecksContext(invChecks) - check.InitializeRDNSQuerierContext(rdnsquerier) pkgcollector.InitPython(common.GetPythonPaths()...) commonchecks.RegisterChecks(wmeta, tagger, config, telemetry) diff --git a/pkg/collector/check/context.go b/pkg/collector/check/context.go index f8226b6cd8ca5..a1e39e4145376 100644 --- a/pkg/collector/check/context.go +++ b/pkg/collector/check/context.go @@ -10,7 +10,6 @@ import ( "sync" "github.com/DataDog/datadog-agent/comp/metadata/inventorychecks" - rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" ) // checkContext holds a list of reference to different components used by Go and Python checks. @@ -21,8 +20,7 @@ import ( // of C to Go. This way python checks can submit metadata to inventorychecks through the 'SetCheckMetadata' python // method. type checkContext struct { - ic inventorychecks.Component - rdnsquerier rdnsquerier.Component + ic inventorychecks.Component } var ctx checkContext @@ -49,32 +47,10 @@ func InitializeInventoryChecksContext(ic inventorychecks.Component) { } } -// GetRDNSQuerierContext returns a reference to the rdnsquerier component for Python and Go checks to use. -func GetRDNSQuerierContext() (rdnsquerier.Component, error) { - checkContextMutex.Lock() - defer checkContextMutex.Unlock() - - if ctx.rdnsquerier == nil { - return nil, errors.New("rdnsquerier context was not set") - } - return ctx.rdnsquerier, nil -} - -// InitializeRDNSQuerierContext set the reference to rdnsquerier in checkContext -func InitializeRDNSQuerierContext(rdnsquerier rdnsquerier.Component) { - checkContextMutex.Lock() - defer checkContextMutex.Unlock() - - if ctx.rdnsquerier == nil { - ctx.rdnsquerier = rdnsquerier - } -} - // ReleaseContext reset to nil all the references hold by the current context func ReleaseContext() { checkContextMutex.Lock() defer checkContextMutex.Unlock() ctx.ic = nil - ctx.rdnsquerier = nil } diff --git a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go index 7c82d002e203e..ff8d90912efb3 100644 --- a/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go +++ b/pkg/collector/corechecks/snmp/internal/report/report_device_metadata.go @@ -6,7 +6,6 @@ package report import ( - "context" json "encoding/json" "net" "sort" @@ -22,7 +21,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/networkdevice/profile/profiledefinition" "github.com/DataDog/datadog-agent/pkg/networkdevice/utils" - "github.com/DataDog/datadog-agent/pkg/collector/check" "github.com/DataDog/datadog-agent/pkg/collector/corechecks/snmp/internal/checkconfig" "github.com/DataDog/datadog-agent/pkg/collector/corechecks/snmp/internal/common" "github.com/DataDog/datadog-agent/pkg/collector/corechecks/snmp/internal/lldp" @@ -216,16 +214,6 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc vendor = config.ProfileDef.Device.Vendor } - hostname := "" - if rdnsquerier, err := check.GetRDNSQuerierContext(); err == nil { - ctx, ctxCancel := context.WithTimeout(context.Background(), 1*time.Millisecond) - hostname, err = rdnsquerier.GetHostnameSync(ctx, config.IPAddress) - if err != nil { - log.Info("Error getting hostname: %v", err) - } - ctxCancel() - } - return devicemetadata.DeviceMetadata{ ID: deviceID, IDTags: idTags, @@ -250,7 +238,6 @@ func buildNetworkDeviceMetadata(deviceID string, idTags []string, config *checkc OsHostname: osHostname, DeviceType: deviceType, Integration: common.SnmpIntegrationName, - RDNSHostname: hostname, } } diff --git a/pkg/networkdevice/metadata/payload.go b/pkg/networkdevice/metadata/payload.go index 510a2a04f149c..54b2f45eb0996 100644 --- a/pkg/networkdevice/metadata/payload.go +++ b/pkg/networkdevice/metadata/payload.go @@ -71,7 +71,6 @@ type DeviceMetadata struct { OsHostname string `json:"os_hostname,omitempty"` Integration string `json:"integration,omitempty"` // indicates the source of the data SNMP, meraki_api, etc. DeviceType string `json:"device_type,omitempty"` - RDNSHostname string `json:"rdns_hostname,omitempty"` } // DeviceOID device scan oid data diff --git a/releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml b/releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml deleted file mode 100644 index c934be9d9a97b..0000000000000 --- a/releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebeeac.yaml +++ /dev/null @@ -1,11 +0,0 @@ -# Each section from every release note are combined when the -# CHANGELOG.rst is rendered. So the text needs to be worded so that -# it does not depend on any information only available in another -# section. This may mean repeating some details, but each section -# must be readable independently of the other. -# -# Each section note must be formatted as reStructuredText. ---- -features: - - | - Added support for enriching SNMP IPs with hostnames through rDNS lookups From 4da5e29407dd85f24ea4db4bfb07438402aac6fe Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Thu, 24 Oct 2024 14:17:59 -0400 Subject: [PATCH 22/32] add function for multiple IPs parallel --- comp/rdnsquerier/impl/rdnsquerier.go | 121 +++++++++++++++++++-------- 1 file changed, 84 insertions(+), 37 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index 1dcbb02c5a6e0..3a46f267fe447 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -10,6 +10,7 @@ import ( "context" "fmt" "net/netip" + "sync" "github.com/DataDog/datadog-agent/comp/core/config" log "github.com/DataDog/datadog-agent/comp/core/log/def" @@ -33,6 +34,13 @@ type Provides struct { Comp rdnsquerier.Component } +// ReverseDNSResult is the result of a reverse DNS lookup +type ReverseDNSResult struct { + IP string + Hostname string + Err error +} + const moduleName = "reverse_dns_enrichment" type rdnsQuerierTelemetry = struct { @@ -185,50 +193,89 @@ func (q *rdnsQuerierImpl) GetHostnameAsync(ipAddr []byte, updateHostnameSync fun // If the IP address is invalid then an error is returned. // If the IP address is not in the private address space then it is ignored - no lookup is performed and nil error is returned. // If the IP address is in the private address space then the IP address will be resolved to a hostname. -// The function accepts a timeout duration, which defaults to 2 seconds if not provided. +// The function accepts a timeout via context and will return an error if the timeout is reached. func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (string, error) { - q.internalTelemetry.total.Inc() - - netipAddr, err := netip.ParseAddr(ipAddr) - if err != nil { - q.internalTelemetry.invalidIPAddress.Inc() - return "", fmt.Errorf("invalid IP address %s: %v", ipAddr, err) + results := q.GetHostnames(ctx, []string{ipAddr}) + if result, ok := results[ipAddr]; !ok { + return "", fmt.Errorf("no result for IP address %s", ipAddr) + } else { + return result.Hostname, result.Err } +} - if !netipAddr.IsPrivate() { - q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", ipAddr) - return "", nil +// GetHostnames attempts to resolve the hostname for the given IP addresses. +// If the IP address is invalid then an error is returned. +// If the IP address is not in the private address space then it is ignored - no lookup is performed and nil error is returned. +// If the IP address is in the private address space then the IP address will be resolved to a hostname. +// The function accepts a timeout via context and will return an error if the timeout is reached. +func (q *rdnsQuerierImpl) GetHostnames(ctx context.Context, ipAddrs []string) map[string]ReverseDNSResult { + q.internalTelemetry.total.Add(float64(len(ipAddrs))) + + var wg sync.WaitGroup + resultsChan := make(chan ReverseDNSResult, len(ipAddrs)) + + for _, ipAddr := range ipAddrs { + wg.Add(1) + go func(ipAddr string) { + defer wg.Done() + netipAddr, err := netip.ParseAddr(ipAddr) + if err != nil { + q.internalTelemetry.invalidIPAddress.Inc() + resultsChan <- ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("invalid IP address %s: %v", ipAddr, err)} + return + } + + if !netipAddr.IsPrivate() { + q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", ipAddr) + resultsChan <- ReverseDNSResult{IP: ipAddr} + return + } + q.internalTelemetry.private.Inc() + + hostnameChan := make(chan string, 1) + asyncErrChan := make(chan error, 1) + + err = q.cache.getHostname( + netipAddr.String(), + func(h string) { + hostnameChan <- h + asyncErrChan <- nil + }, + func(h string, e error) { + hostnameChan <- h + asyncErrChan <- e + }, + ) + if err != nil { + q.logger.Debugf("Reverse DNS Enrichment cache.getHostname() for addr %s returned error: %v", netipAddr.String(), err) + } + + select { + case hostname := <-hostnameChan: + asyncErr := <-asyncErrChan // this is okay because we know that as soon as we send hostname, we send asyncErr + err = multierr.Append(err, asyncErr) + resultsChan <- ReverseDNSResult{ + IP: ipAddr, + Hostname: hostname, + Err: err, + } + case <-ctx.Done(): + resultsChan <- ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr)} + } + }(ipAddr) } - q.internalTelemetry.private.Inc() - hostnameChan := make(chan string, 1) - asyncErrChan := make(chan error, 1) - - err = q.cache.getHostname( - netipAddr.String(), - // only 1 of these callbacks will be executed - // so we don't risk a panic here - func(h string) { - hostnameChan <- h - asyncErrChan <- nil - }, - func(h string, e error) { - hostnameChan <- h - asyncErrChan <- e - }, - ) - if err != nil { - q.logger.Debugf("Reverse DNS Enrichment cache.getHostname() for addr %s returned error: %v", netipAddr.String(), err) - } + go func() { + wg.Wait() + close(resultsChan) + }() - select { - case hostname := <-hostnameChan: - asyncErr := <-asyncErrChan // this is okay because we know that as soon as we send hostname, we send asyncErr - err = multierr.Append(err, asyncErr) - return hostname, err - case <-ctx.Done(): - return "", fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr) + results := make(map[string]ReverseDNSResult, len(ipAddrs)) + for result := range resultsChan { + results[result.IP] = result } + + return results } func (q *rdnsQuerierImpl) start(_ context.Context) error { From f9978d9b28716252d7e2ddcb61c07d843d598395 Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Thu, 24 Oct 2024 15:29:48 -0400 Subject: [PATCH 23/32] add tests for GetHostnames --- comp/rdnsquerier/impl/rdnsquerier_test.go | 73 +++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index 637cf87ceb6f9..9a6c6a5e78426 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -1086,3 +1086,76 @@ func TestGetHostnameSyncTimeouts(t *testing.T) { cancel() } + +func TestGetHostnames(t *testing.T) { + overrides := map[string]interface{}{ + "network_devices.netflow.reverse_dns_enrichment_enabled": true, + } + + defaultTs := testSetup(t, overrides, true, nil, 100*time.Millisecond) + + tests := []struct { + name string + ts *testState + ipAddrs []string + timeout time.Duration + expected map[string]ReverseDNSResult + }{ + { + name: "valid IPs", + ts: defaultTs, + ipAddrs: []string{"192.168.1.100", "192.168.1.101"}, + timeout: 1 * time.Second, + expected: map[string]ReverseDNSResult{ + "192.168.1.100": {IP: "192.168.1.100", Hostname: "fakehostname-192.168.1.100"}, + "192.168.1.101": {IP: "192.168.1.101", Hostname: "fakehostname-192.168.1.101"}, + }, + }, + { + name: "invalid IP, private IPs, and public IP", + ts: defaultTs, + ipAddrs: []string{"invalid_ip", "192.168.1.102", "8.8.8.8", "192.168.1.100"}, + timeout: 1 * time.Second, + expected: map[string]ReverseDNSResult{ + "invalid_ip": {IP: "invalid_ip", Err: fmt.Errorf("invalid IP address invalid_ip")}, + "192.168.1.102": {IP: "192.168.1.102", Hostname: "fakehostname-192.168.1.102"}, + "8.8.8.8": {IP: "8.8.8.8"}, + "192.168.1.100": {IP: "192.168.1.100", Hostname: "fakehostname-192.168.1.100"}, + }, + }, + { + name: "invalid IP, timeout for private and public IPs", + ts: testSetup(t, overrides, true, nil, 10*time.Second), + ipAddrs: []string{"192.168.1.105", "invalid", "8.8.8.8"}, + timeout: 1 * time.Second, + expected: map[string]ReverseDNSResult{ + "192.168.1.105": {IP: "192.168.1.105", Err: fmt.Errorf("timeout reached while resolving hostname for IP address 192.168.1.105")}, + "invalid": {IP: "invalid", Err: fmt.Errorf("invalid IP address invalid")}, + "8.8.8.8": {IP: "8.8.8.8"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(tt.ts.ctx, tt.timeout) + defer cancel() + + internalRDNSQuerier := tt.ts.rdnsQuerier.(*rdnsQuerierImpl) + results := internalRDNSQuerier.GetHostnames(ctx, tt.ipAddrs) + + for ip, expectedResult := range tt.expected { + result, ok := results[ip] + require.True(t, ok, "result for IP %s not found", ip) + assert.Equal(t, expectedResult.IP, result.IP) + assert.Equal(t, expectedResult.Hostname, result.Hostname) + if expectedResult.Err != nil { + require.Error(t, result.Err) + assert.Contains(t, result.Err.Error(), expectedResult.Err.Error()) + } else { + assert.NoError(t, result.Err) + } + } + }) + } +} From 66e64d89a1a3ada994947dc6232e77f110274c59 Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Thu, 24 Oct 2024 15:31:16 -0400 Subject: [PATCH 24/32] fix linter error --- comp/rdnsquerier/impl/rdnsquerier.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index 3a46f267fe447..bcac6d6bda80b 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -196,11 +196,12 @@ func (q *rdnsQuerierImpl) GetHostnameAsync(ipAddr []byte, updateHostnameSync fun // The function accepts a timeout via context and will return an error if the timeout is reached. func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (string, error) { results := q.GetHostnames(ctx, []string{ipAddr}) - if result, ok := results[ipAddr]; !ok { + result, ok := results[ipAddr] + if !ok { return "", fmt.Errorf("no result for IP address %s", ipAddr) - } else { - return result.Hostname, result.Err } + + return result.Hostname, result.Err } // GetHostnames attempts to resolve the hostname for the given IP addresses. From 6073f3f882d741691098f50c7644a87c6f7acf05 Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Thu, 24 Oct 2024 15:47:22 -0400 Subject: [PATCH 25/32] update component definition, add mock --- comp/rdnsquerier/def/component.go | 8 ++++++++ comp/rdnsquerier/impl-none/none.go | 5 +++++ comp/rdnsquerier/impl/rdnsquerier.go | 21 +++++++------------- comp/rdnsquerier/mock/mock.go | 29 +++++++++++++++++++++++----- 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/comp/rdnsquerier/def/component.go b/comp/rdnsquerier/def/component.go index acb35267041e9..f2cef531fcd98 100644 --- a/comp/rdnsquerier/def/component.go +++ b/comp/rdnsquerier/def/component.go @@ -12,8 +12,16 @@ import ( // team: network-device-monitoring +// ReverseDNSResult is the result of a reverse DNS lookup +type ReverseDNSResult struct { + IP string + Hostname string + Err error +} + // Component is the component type. type Component interface { GetHostnameAsync([]byte, func(string), func(string, error)) error GetHostnameSync(context.Context, string) (string, error) + GetHostnames(context.Context, []string) map[string]ReverseDNSResult } diff --git a/comp/rdnsquerier/impl-none/none.go b/comp/rdnsquerier/impl-none/none.go index eff12d040a1a2..551509abceab9 100644 --- a/comp/rdnsquerier/impl-none/none.go +++ b/comp/rdnsquerier/impl-none/none.go @@ -36,3 +36,8 @@ func (q *rdnsQuerierImplNone) GetHostnameSync(_ context.Context, _ string) (stri // noop return "", nil } + +func (q *rdnsQuerierImplNone) GetHostnames(_ context.Context, _ []string) map[string]rdnsquerier.ReverseDNSResult { + // noop + return nil +} diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index bcac6d6bda80b..8e87a77f2a536 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -34,13 +34,6 @@ type Provides struct { Comp rdnsquerier.Component } -// ReverseDNSResult is the result of a reverse DNS lookup -type ReverseDNSResult struct { - IP string - Hostname string - Err error -} - const moduleName = "reverse_dns_enrichment" type rdnsQuerierTelemetry = struct { @@ -209,11 +202,11 @@ func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (s // If the IP address is not in the private address space then it is ignored - no lookup is performed and nil error is returned. // If the IP address is in the private address space then the IP address will be resolved to a hostname. // The function accepts a timeout via context and will return an error if the timeout is reached. -func (q *rdnsQuerierImpl) GetHostnames(ctx context.Context, ipAddrs []string) map[string]ReverseDNSResult { +func (q *rdnsQuerierImpl) GetHostnames(ctx context.Context, ipAddrs []string) map[string]rdnsquerier.ReverseDNSResult { q.internalTelemetry.total.Add(float64(len(ipAddrs))) var wg sync.WaitGroup - resultsChan := make(chan ReverseDNSResult, len(ipAddrs)) + resultsChan := make(chan rdnsquerier.ReverseDNSResult, len(ipAddrs)) for _, ipAddr := range ipAddrs { wg.Add(1) @@ -222,13 +215,13 @@ func (q *rdnsQuerierImpl) GetHostnames(ctx context.Context, ipAddrs []string) ma netipAddr, err := netip.ParseAddr(ipAddr) if err != nil { q.internalTelemetry.invalidIPAddress.Inc() - resultsChan <- ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("invalid IP address %s: %v", ipAddr, err)} + resultsChan <- rdnsquerier.ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("invalid IP address %s: %v", ipAddr, err)} return } if !netipAddr.IsPrivate() { q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", ipAddr) - resultsChan <- ReverseDNSResult{IP: ipAddr} + resultsChan <- rdnsquerier.ReverseDNSResult{IP: ipAddr} return } q.internalTelemetry.private.Inc() @@ -255,13 +248,13 @@ func (q *rdnsQuerierImpl) GetHostnames(ctx context.Context, ipAddrs []string) ma case hostname := <-hostnameChan: asyncErr := <-asyncErrChan // this is okay because we know that as soon as we send hostname, we send asyncErr err = multierr.Append(err, asyncErr) - resultsChan <- ReverseDNSResult{ + resultsChan <- rdnsquerier.ReverseDNSResult{ IP: ipAddr, Hostname: hostname, Err: err, } case <-ctx.Done(): - resultsChan <- ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr)} + resultsChan <- rdnsquerier.ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr)} } }(ipAddr) } @@ -271,7 +264,7 @@ func (q *rdnsQuerierImpl) GetHostnames(ctx context.Context, ipAddrs []string) ma close(resultsChan) }() - results := make(map[string]ReverseDNSResult, len(ipAddrs)) + results := make(map[string]rdnsquerier.ReverseDNSResult, len(ipAddrs)) for result := range resultsChan { results[result.IP] = result } diff --git a/comp/rdnsquerier/mock/mock.go b/comp/rdnsquerier/mock/mock.go index d7665197ada6a..3505b543b749c 100644 --- a/comp/rdnsquerier/mock/mock.go +++ b/comp/rdnsquerier/mock/mock.go @@ -11,7 +11,6 @@ package mock import ( "context" "fmt" - "net" "net/netip" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" @@ -57,14 +56,34 @@ func (q *rdnsQuerierMock) GetHostnameAsync(ipAddr []byte, updateHostnameSync fun // GetHostnameSync simulates resolving the hostname for the given IP address synchronously. If the IP address is in the private address // space then the resolved hostname is returned. func (q *rdnsQuerierMock) GetHostnameSync(_ context.Context, ipAddr string) (string, error) { - ipaddr := net.ParseIP(ipAddr).To4() - if ipaddr == nil { + netipAddr, err := netip.ParseAddr(ipAddr) + if err != nil { return "", fmt.Errorf("invalid IP address %v", ipAddr) } - if !ipaddr.IsPrivate() { + if !netipAddr.IsPrivate() { return "", nil } - return "hostname-" + ipaddr.String(), nil + return "hostname-" + netipAddr.String(), nil +} + +// GetHostnames simulates resolving the hostnames for the given IP addresses synchronously. If the IP address is in the private address +// space then the resolved hostname is returned. +func (q *rdnsQuerierMock) GetHostnames(_ context.Context, ipAddrs []string) map[string]rdnsquerier.ReverseDNSResult { + results := make(map[string]rdnsquerier.ReverseDNSResult, len(ipAddrs)) + for _, ipAddr := range ipAddrs { + netipAddr, err := netip.ParseAddr(ipAddr) + if err != nil { + results[ipAddr] = rdnsquerier.ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("invalid IP address %v", ipAddr)} + } + + if !netipAddr.IsPrivate() { + results[ipAddr] = rdnsquerier.ReverseDNSResult{IP: ipAddr} + } + + results[ipAddr] = rdnsquerier.ReverseDNSResult{IP: ipAddr, Hostname: "hostname-" + netipAddr.String()} + } + + return results } From c69817bd1e681675f61013eefc808aca1ccc218c Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Thu, 24 Oct 2024 16:31:19 -0400 Subject: [PATCH 26/32] udpate naming --- comp/rdnsquerier/def/component.go | 2 +- comp/rdnsquerier/impl-none/none.go | 4 +++- comp/rdnsquerier/impl/rdnsquerier.go | 4 ++-- comp/rdnsquerier/mock/mock.go | 4 ++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/comp/rdnsquerier/def/component.go b/comp/rdnsquerier/def/component.go index f2cef531fcd98..b28b688795403 100644 --- a/comp/rdnsquerier/def/component.go +++ b/comp/rdnsquerier/def/component.go @@ -22,6 +22,6 @@ type ReverseDNSResult struct { // Component is the component type. type Component interface { GetHostnameAsync([]byte, func(string), func(string, error)) error - GetHostnameSync(context.Context, string) (string, error) + GetHostname(context.Context, string) (string, error) GetHostnames(context.Context, []string) map[string]ReverseDNSResult } diff --git a/comp/rdnsquerier/impl-none/none.go b/comp/rdnsquerier/impl-none/none.go index 551509abceab9..0b7f1022f1eb9 100644 --- a/comp/rdnsquerier/impl-none/none.go +++ b/comp/rdnsquerier/impl-none/none.go @@ -32,11 +32,13 @@ func (q *rdnsQuerierImplNone) GetHostnameAsync(_ []byte, _ func(string), _ func( return nil } -func (q *rdnsQuerierImplNone) GetHostnameSync(_ context.Context, _ string) (string, error) { +// GetHostname does nothing for the noop rdnsquerier implementation +func (q *rdnsQuerierImplNone) GetHostname(_ context.Context, _ string) (string, error) { // noop return "", nil } +// GetHostnames does nothing for the noop rdnsquerier implementation func (q *rdnsQuerierImplNone) GetHostnames(_ context.Context, _ []string) map[string]rdnsquerier.ReverseDNSResult { // noop return nil diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index 8e87a77f2a536..a559e29f326ff 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -182,12 +182,12 @@ func (q *rdnsQuerierImpl) GetHostnameAsync(ipAddr []byte, updateHostnameSync fun return err } -// GetHostnameSync attempts to resolve the hostname for the given IP address synchronously. +// GetHostname attempts to resolve the hostname for the given IP address synchronously. // If the IP address is invalid then an error is returned. // If the IP address is not in the private address space then it is ignored - no lookup is performed and nil error is returned. // If the IP address is in the private address space then the IP address will be resolved to a hostname. // The function accepts a timeout via context and will return an error if the timeout is reached. -func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (string, error) { +func (q *rdnsQuerierImpl) GetHostname(ctx context.Context, ipAddr string) (string, error) { results := q.GetHostnames(ctx, []string{ipAddr}) result, ok := results[ipAddr] if !ok { diff --git a/comp/rdnsquerier/mock/mock.go b/comp/rdnsquerier/mock/mock.go index 3505b543b749c..afd26826d82fc 100644 --- a/comp/rdnsquerier/mock/mock.go +++ b/comp/rdnsquerier/mock/mock.go @@ -53,9 +53,9 @@ func (q *rdnsQuerierMock) GetHostnameAsync(ipAddr []byte, updateHostnameSync fun return nil } -// GetHostnameSync simulates resolving the hostname for the given IP address synchronously. If the IP address is in the private address +// GetHostname simulates resolving the hostname for the given IP address synchronously. If the IP address is in the private address // space then the resolved hostname is returned. -func (q *rdnsQuerierMock) GetHostnameSync(_ context.Context, ipAddr string) (string, error) { +func (q *rdnsQuerierMock) GetHostname(_ context.Context, ipAddr string) (string, error) { netipAddr, err := netip.ParseAddr(ipAddr) if err != nil { return "", fmt.Errorf("invalid IP address %v", ipAddr) From 312a20c0ac8401c0de2fe5b4c53bfe8dc7ab4e1a Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Thu, 24 Oct 2024 16:46:36 -0400 Subject: [PATCH 27/32] add import to test --- comp/rdnsquerier/impl/rdnsquerier_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index 9a6c6a5e78426..5fb2e950b663a 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -15,6 +15,8 @@ import ( "testing" "time" + rdnsquerierdef "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1057,7 +1059,7 @@ func TestGetHostnameSync(t *testing.T) { t.Run(name, func(t *testing.T) { ctx, cancel := context.WithTimeout(ts.ctx, tt.timeout) defer cancel() - hostname, err := internalRDNSQuerier.GetHostnameSync(ctx, tt.ip) + hostname, err := internalRDNSQuerier.GetHostname(ctx, tt.ip) if tt.errMsg != "" { require.Error(t, err) assert.Contains(t, err.Error(), tt.errMsg) @@ -1079,7 +1081,7 @@ func TestGetHostnameSyncTimeouts(t *testing.T) { ctx, cancel := context.WithTimeout(ts.ctx, 1*time.Millisecond) // Test with a timeout exceeding the specified timeout limit - hostname, err := internalRDNSQuerier.GetHostnameSync(ctx, "192.168.1.100") + hostname, err := internalRDNSQuerier.GetHostname(ctx, "192.168.1.100") assert.Equal(t, "", hostname) assert.Error(t, err) assert.Contains(t, err.Error(), "timeout reached while resolving hostname for IP address 192.168.1.100") @@ -1099,14 +1101,14 @@ func TestGetHostnames(t *testing.T) { ts *testState ipAddrs []string timeout time.Duration - expected map[string]ReverseDNSResult + expected map[string]rdnsquerierdef.ReverseDNSResult }{ { name: "valid IPs", ts: defaultTs, ipAddrs: []string{"192.168.1.100", "192.168.1.101"}, timeout: 1 * time.Second, - expected: map[string]ReverseDNSResult{ + expected: map[string]rdnsquerierdef.ReverseDNSResult{ "192.168.1.100": {IP: "192.168.1.100", Hostname: "fakehostname-192.168.1.100"}, "192.168.1.101": {IP: "192.168.1.101", Hostname: "fakehostname-192.168.1.101"}, }, @@ -1116,7 +1118,7 @@ func TestGetHostnames(t *testing.T) { ts: defaultTs, ipAddrs: []string{"invalid_ip", "192.168.1.102", "8.8.8.8", "192.168.1.100"}, timeout: 1 * time.Second, - expected: map[string]ReverseDNSResult{ + expected: map[string]rdnsquerierdef.ReverseDNSResult{ "invalid_ip": {IP: "invalid_ip", Err: fmt.Errorf("invalid IP address invalid_ip")}, "192.168.1.102": {IP: "192.168.1.102", Hostname: "fakehostname-192.168.1.102"}, "8.8.8.8": {IP: "8.8.8.8"}, @@ -1128,7 +1130,7 @@ func TestGetHostnames(t *testing.T) { ts: testSetup(t, overrides, true, nil, 10*time.Second), ipAddrs: []string{"192.168.1.105", "invalid", "8.8.8.8"}, timeout: 1 * time.Second, - expected: map[string]ReverseDNSResult{ + expected: map[string]rdnsquerierdef.ReverseDNSResult{ "192.168.1.105": {IP: "192.168.1.105", Err: fmt.Errorf("timeout reached while resolving hostname for IP address 192.168.1.105")}, "invalid": {IP: "invalid", Err: fmt.Errorf("invalid IP address invalid")}, "8.8.8.8": {IP: "8.8.8.8"}, From de28955b711e383fe11fce9d907fa1e95a37fa13 Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Fri, 25 Oct 2024 10:00:09 -0400 Subject: [PATCH 28/32] remove newline --- pkg/cli/subcommands/check/command.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/cli/subcommands/check/command.go b/pkg/cli/subcommands/check/command.go index 8a70b5eb3e2c4..56f224389b455 100644 --- a/pkg/cli/subcommands/check/command.go +++ b/pkg/cli/subcommands/check/command.go @@ -288,7 +288,6 @@ func run( // TODO: (components) - Until the checks are components we set there context so they can depends on components. check.InitializeInventoryChecksContext(invChecks) - pkgcollector.InitPython(common.GetPythonPaths()...) commonchecks.RegisterChecks(wmeta, tagger, config, telemetry) From 92fcea33560049612b5ff275bf015a19b09b3e2e Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Fri, 25 Oct 2024 10:39:44 -0400 Subject: [PATCH 29/32] separate out single query logic --- comp/rdnsquerier/impl/rdnsquerier.go | 92 ++++++++++------------- comp/rdnsquerier/impl/rdnsquerier_test.go | 50 +++++++++++- 2 files changed, 87 insertions(+), 55 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index a559e29f326ff..86b4d40e91ac8 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -188,13 +188,43 @@ func (q *rdnsQuerierImpl) GetHostnameAsync(ipAddr []byte, updateHostnameSync fun // If the IP address is in the private address space then the IP address will be resolved to a hostname. // The function accepts a timeout via context and will return an error if the timeout is reached. func (q *rdnsQuerierImpl) GetHostname(ctx context.Context, ipAddr string) (string, error) { - results := q.GetHostnames(ctx, []string{ipAddr}) - result, ok := results[ipAddr] - if !ok { - return "", fmt.Errorf("no result for IP address %s", ipAddr) + q.internalTelemetry.total.Inc() + + netipAddr, err := netip.ParseAddr(ipAddr) + if err != nil { + q.internalTelemetry.invalidIPAddress.Inc() + return "", fmt.Errorf("invalid IP address %s: %v", ipAddr, err) + } + + if !netipAddr.IsPrivate() { + q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", ipAddr) + return "", nil } + q.internalTelemetry.private.Inc() + + resultsChan := make(chan rdnsquerier.ReverseDNSResult, 1) - return result.Hostname, result.Err + err = q.cache.getHostname( + netipAddr.String(), + func(h string) { + resultsChan <- rdnsquerier.ReverseDNSResult{Hostname: h} + }, + func(h string, e error) { + resultsChan <- rdnsquerier.ReverseDNSResult{Hostname: h, Err: e} + }, + ) + if err != nil { + q.logger.Debugf("Reverse DNS Enrichment cache.getHostname() for addr %s returned error: %v", netipAddr.String(), err) + return "", err + } + + select { + case result := <-resultsChan: + err = multierr.Append(err, result.Err) + return result.Hostname, err + case <-ctx.Done(): + return "", fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr) + } } // GetHostnames attempts to resolve the hostname for the given IP addresses. @@ -203,60 +233,16 @@ func (q *rdnsQuerierImpl) GetHostname(ctx context.Context, ipAddr string) (strin // If the IP address is in the private address space then the IP address will be resolved to a hostname. // The function accepts a timeout via context and will return an error if the timeout is reached. func (q *rdnsQuerierImpl) GetHostnames(ctx context.Context, ipAddrs []string) map[string]rdnsquerier.ReverseDNSResult { - q.internalTelemetry.total.Add(float64(len(ipAddrs))) - var wg sync.WaitGroup resultsChan := make(chan rdnsquerier.ReverseDNSResult, len(ipAddrs)) for _, ipAddr := range ipAddrs { wg.Add(1) - go func(ipAddr string) { + go func(ctx context.Context, ipAddr string) { defer wg.Done() - netipAddr, err := netip.ParseAddr(ipAddr) - if err != nil { - q.internalTelemetry.invalidIPAddress.Inc() - resultsChan <- rdnsquerier.ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("invalid IP address %s: %v", ipAddr, err)} - return - } - - if !netipAddr.IsPrivate() { - q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", ipAddr) - resultsChan <- rdnsquerier.ReverseDNSResult{IP: ipAddr} - return - } - q.internalTelemetry.private.Inc() - - hostnameChan := make(chan string, 1) - asyncErrChan := make(chan error, 1) - - err = q.cache.getHostname( - netipAddr.String(), - func(h string) { - hostnameChan <- h - asyncErrChan <- nil - }, - func(h string, e error) { - hostnameChan <- h - asyncErrChan <- e - }, - ) - if err != nil { - q.logger.Debugf("Reverse DNS Enrichment cache.getHostname() for addr %s returned error: %v", netipAddr.String(), err) - } - - select { - case hostname := <-hostnameChan: - asyncErr := <-asyncErrChan // this is okay because we know that as soon as we send hostname, we send asyncErr - err = multierr.Append(err, asyncErr) - resultsChan <- rdnsquerier.ReverseDNSResult{ - IP: ipAddr, - Hostname: hostname, - Err: err, - } - case <-ctx.Done(): - resultsChan <- rdnsquerier.ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr)} - } - }(ipAddr) + hostname, err := q.GetHostname(ctx, ipAddr) + resultsChan <- rdnsquerier.ReverseDNSResult{IP: ipAddr, Hostname: hostname, Err: err} + }(ctx, ipAddr) } go func() { diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index 5fb2e950b663a..57740492ca8e6 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -997,7 +997,7 @@ func TestCachePersist(t *testing.T) { ts.validateExpected(t, expectedTelemetry) } -func TestGetHostnameSync(t *testing.T) { +func TestGetHostname(t *testing.T) { overrides := map[string]interface{}{ "network_devices.netflow.reverse_dns_enrichment_enabled": true, } @@ -1071,7 +1071,7 @@ func TestGetHostnameSync(t *testing.T) { } } -func TestGetHostnameSyncTimeouts(t *testing.T) { +func TestGetHostnameTimeout(t *testing.T) { overrides := map[string]interface{}{ "network_devices.netflow.reverse_dns_enrichment_enabled": true, } @@ -1089,6 +1089,52 @@ func TestGetHostnameSyncTimeouts(t *testing.T) { cancel() } +// Test that when the rate limit is exceeded and the channel fills requests are dropped. +func TestGetHostnameChannelFullRequestsDroppedWhenRateLimited(t *testing.T) { + overrides := map[string]interface{}{ + "network_devices.netflow.reverse_dns_enrichment_enabled": true, + "reverse_dns_enrichment.workers": 1, + "reverse_dns_enrichment.chan_size": 1, + "reverse_dns_enrichment.rate_limiter.enabled": true, + "reverse_dns_enrichment.rate_limiter.limit_per_sec": 1, + } + ts := testSetup(t, overrides, true, nil, 0) + + // IP addresses in private range + var errCount int + var wg sync.WaitGroup + for i := range 20 { + wg.Add(1) + go func(i int) { + defer wg.Done() + hostname, err := ts.rdnsQuerier.GetHostname(ts.ctx, fmt.Sprintf("192.168.1.%d", i)) + if err != nil { + assert.ErrorContains(t, err, "channel is full, dropping query for IP address") + errCount++ + } else { + assert.Equal(t, fmt.Sprintf("fakehostname-192.168.1.%d", i), hostname) + } + }(i) + } + wg.Wait() + + assert.GreaterOrEqual(t, errCount, 1) + expectedTelemetry := ts.makeExpectedTelemetry(map[string]float64{ + "total": 20.0, + "private": 20.0, + "chan_added": float64(20 - errCount), + "dropped_chan_full": float64(errCount), + "cache_miss": 20.0, + }) + delete(expectedTelemetry, "successful") + ts.validateExpected(t, expectedTelemetry) + + minimumTelemetry := map[string]float64{ + "successful": 1.0, + } + ts.validateMinimum(t, minimumTelemetry) +} + func TestGetHostnames(t *testing.T) { overrides := map[string]interface{}{ "network_devices.netflow.reverse_dns_enrichment_enabled": true, From 2ebb1aef65f023bbb85782ef49e14500cd0804f2 Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Fri, 25 Oct 2024 11:15:12 -0400 Subject: [PATCH 30/32] add delay to test --- comp/rdnsquerier/impl/rdnsquerier_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index 57740492ca8e6..47b8a690ea194 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -1098,7 +1098,7 @@ func TestGetHostnameChannelFullRequestsDroppedWhenRateLimited(t *testing.T) { "reverse_dns_enrichment.rate_limiter.enabled": true, "reverse_dns_enrichment.rate_limiter.limit_per_sec": 1, } - ts := testSetup(t, overrides, true, nil, 0) + ts := testSetup(t, overrides, true, nil, 1*time.Second) // IP addresses in private range var errCount int From a77b6c91c1633bdd0987f0b599f59b312c68e8a0 Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Fri, 25 Oct 2024 11:27:33 -0400 Subject: [PATCH 31/32] increase test delay --- comp/rdnsquerier/impl/rdnsquerier_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index 47b8a690ea194..d094340c888f1 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -1098,7 +1098,7 @@ func TestGetHostnameChannelFullRequestsDroppedWhenRateLimited(t *testing.T) { "reverse_dns_enrichment.rate_limiter.enabled": true, "reverse_dns_enrichment.rate_limiter.limit_per_sec": 1, } - ts := testSetup(t, overrides, true, nil, 1*time.Second) + ts := testSetup(t, overrides, true, nil, 5*time.Second) // IP addresses in private range var errCount int From c704f605f706330729166ec8b6b9ccd65871b179 Mon Sep 17 00:00:00 2001 From: Ken Schneider Date: Fri, 25 Oct 2024 14:04:28 -0400 Subject: [PATCH 32/32] fix race condition --- comp/rdnsquerier/impl/rdnsquerier_test.go | 13 +++++++------ comp/rdnsquerier/impl/rdnsquerier_testutils.go | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/comp/rdnsquerier/impl/rdnsquerier_test.go b/comp/rdnsquerier/impl/rdnsquerier_test.go index d094340c888f1..81d80b8ea2f7c 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_test.go +++ b/comp/rdnsquerier/impl/rdnsquerier_test.go @@ -12,6 +12,7 @@ import ( "fmt" "net" "sync" + "sync/atomic" "testing" "time" @@ -1098,10 +1099,10 @@ func TestGetHostnameChannelFullRequestsDroppedWhenRateLimited(t *testing.T) { "reverse_dns_enrichment.rate_limiter.enabled": true, "reverse_dns_enrichment.rate_limiter.limit_per_sec": 1, } - ts := testSetup(t, overrides, true, nil, 5*time.Second) + ts := testSetup(t, overrides, true, nil, 1*time.Second) // IP addresses in private range - var errCount int + var errCount atomic.Int32 var wg sync.WaitGroup for i := range 20 { wg.Add(1) @@ -1110,7 +1111,7 @@ func TestGetHostnameChannelFullRequestsDroppedWhenRateLimited(t *testing.T) { hostname, err := ts.rdnsQuerier.GetHostname(ts.ctx, fmt.Sprintf("192.168.1.%d", i)) if err != nil { assert.ErrorContains(t, err, "channel is full, dropping query for IP address") - errCount++ + errCount.Add(1) } else { assert.Equal(t, fmt.Sprintf("fakehostname-192.168.1.%d", i), hostname) } @@ -1118,12 +1119,12 @@ func TestGetHostnameChannelFullRequestsDroppedWhenRateLimited(t *testing.T) { } wg.Wait() - assert.GreaterOrEqual(t, errCount, 1) + assert.GreaterOrEqual(t, errCount.Load(), int32(1)) expectedTelemetry := ts.makeExpectedTelemetry(map[string]float64{ "total": 20.0, "private": 20.0, - "chan_added": float64(20 - errCount), - "dropped_chan_full": float64(errCount), + "chan_added": float64(20 - errCount.Load()), + "dropped_chan_full": float64(errCount.Load()), "cache_miss": 20.0, }) delete(expectedTelemetry, "successful") diff --git a/comp/rdnsquerier/impl/rdnsquerier_testutils.go b/comp/rdnsquerier/impl/rdnsquerier_testutils.go index 3b3f9cf2359b6..d1b43364ac224 100644 --- a/comp/rdnsquerier/impl/rdnsquerier_testutils.go +++ b/comp/rdnsquerier/impl/rdnsquerier_testutils.go @@ -159,7 +159,7 @@ func (ts *testState) validateExpected(t *testing.T, expectedTelemetry map[string } else { assert.NoError(t, err) assert.Len(t, metrics, 1) - assert.Equal(t, expected, metrics[0].Value()) + assert.Equal(t, expected, metrics[0].Value(), name) } } }