Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NDMII-3058] Add synchronous rDNS lookup to rdnsquerier component #30002

Merged
merged 33 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
28291f0
Allow rdsquerier to run in snmp core check
vicweiss Oct 9, 2024
6fdcbf4
Move Sync rDNS lookup to querier
vicweiss Oct 10, 2024
14d6fd0
add release notes
vicweiss Oct 10, 2024
e33a05e
Add component for rdnsQuerierImplNone
vicweiss Oct 10, 2024
b186c38
Add component mock
vicweiss Oct 16, 2024
96d868f
debug: add log line
vicweiss Oct 16, 2024
44a9d29
Fix: SNMP CLI command, missing rdsquerier fx import
vicweiss Oct 17, 2024
13be360
Add timeout the sync rDNS resolver
vicweiss Oct 18, 2024
050bda0
Make it thread safe
vicweiss Oct 18, 2024
03aeb57
Make the linters :)
vicweiss Oct 18, 2024
8630743
fix race condition
vicweiss Oct 18, 2024
0327484
Fix timeout, added tests
vicweiss Oct 18, 2024
05da484
Remove debug logs
vicweiss Oct 21, 2024
1638c35
Log errors
vicweiss Oct 21, 2024
64ebedf
Merge branch 'main' into vic.weiss/snmp_rdns_enrichment
vicweiss Oct 21, 2024
d2f57ae
Linter fixes
vicweiss Oct 21, 2024
a456375
More linting
vicweiss Oct 21, 2024
d115431
Update releasenotes/notes/sndm-rdns-hostname-enrichment-1ca16478f8ebe…
vicweiss Oct 22, 2024
0f734f1
rename GetHostname to GetHostnameAsync
ken-schneider Oct 23, 2024
b2c0cca
use channels instead of a lock
ken-schneider Oct 23, 2024
4d29545
use table test for unit test
ken-schneider Oct 23, 2024
5df33c0
Remove rDNS querier from SNMP core check (#30439)
vicweiss Oct 23, 2024
4da5e29
add function for multiple IPs parallel
ken-schneider Oct 24, 2024
f9978d9
add tests for GetHostnames
ken-schneider Oct 24, 2024
66e64d8
fix linter error
ken-schneider Oct 24, 2024
6073f3f
update component definition, add mock
ken-schneider Oct 24, 2024
c69817b
udpate naming
ken-schneider Oct 24, 2024
312a20c
add import to test
ken-schneider Oct 24, 2024
de28955
remove newline
ken-schneider Oct 25, 2024
92fcea3
separate out single query logic
ken-schneider Oct 25, 2024
2ebb1ae
add delay to test
ken-schneider Oct 25, 2024
a77b6c9
increase test delay
ken-schneider Oct 25, 2024
c704f60
fix race condition
ken-schneider Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions comp/netflow/flowaggregator/flowaccumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (f *flowAccumulator) setDstReverseDNSHostname(aggHash uint64, hostname stri
}

func (f *flowAccumulator) addRDNSEnrichment(aggHash uint64, srcAddr []byte, dstAddr []byte) {
err := f.rdnsQuerier.GetHostname(
err := f.rdnsQuerier.GetHostnameAsync(
srcAddr,
// Sync callback, lock is already held
func(hostname string) {
Expand All @@ -213,7 +213,7 @@ func (f *flowAccumulator) addRDNSEnrichment(aggHash uint64, srcAddr []byte, dstA
f.logger.Debugf("Error requesting reverse DNS enrichment for source IP address: %v error: %v", srcAddr, err)
}

err = f.rdnsQuerier.GetHostname(
err = f.rdnsQuerier.GetHostnameAsync(
dstAddr,
// Sync callback, lock is held
func(hostname string) {
Expand Down
15 changes: 14 additions & 1 deletion comp/rdnsquerier/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,22 @@
// Package rdnsquerier provides the reverse DNS querier component.
package rdnsquerier

import (
"context"
)

// team: network-device-monitoring

// ReverseDNSResult is the result of a reverse DNS lookup
type ReverseDNSResult struct {
IP string
Hostname string
Err error
}

// Component is the component type.
type Component interface {
GetHostname([]byte, func(string), func(string, error)) error
GetHostnameAsync([]byte, func(string), func(string, error)) error
GetHostnameSync(context.Context, string) (string, error)
GetHostnames(context.Context, []string) map[string]ReverseDNSResult
}
14 changes: 13 additions & 1 deletion comp/rdnsquerier/impl-none/none.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package rdnsquerierimpl

import (
"context"

rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def"
)

Expand All @@ -25,7 +27,17 @@ func NewNone() Provides {
}

// GetHostnameAsync does nothing for the noop rdnsquerier implementation
func (q *rdnsQuerierImplNone) GetHostname(_ []byte, _ func(string), _ func(string, error)) error {
func (q *rdnsQuerierImplNone) GetHostnameAsync(_ []byte, _ func(string), _ func(string, error)) error {
// noop
return nil
}

func (q *rdnsQuerierImplNone) GetHostnameSync(_ context.Context, _ string) (string, error) {
// noop
return "", nil
}

func (q *rdnsQuerierImplNone) GetHostnames(_ context.Context, _ []string) map[string]rdnsquerier.ReverseDNSResult {
// noop
return nil
}
99 changes: 95 additions & 4 deletions comp/rdnsquerier/impl/rdnsquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"context"
"fmt"
"net/netip"
"sync"

"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
compdef "github.com/DataDog/datadog-agent/comp/def"
rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def"
rdnsquerierimplnone "github.com/DataDog/datadog-agent/comp/rdnsquerier/impl-none"
"go.uber.org/multierr"
)

// Requires defines the dependencies for the rdnsquerier component
Expand Down Expand Up @@ -157,7 +159,7 @@ func NewComponent(reqs Requires) (Provides, error) {
// If the hostname for the IP address is immediately available (i.e. cache is enabled and entry is cached) then the updateHostnameSync callback
// will be invoked synchronously, otherwise a query is sent to a channel to be processed asynchronously. If the channel is full then an error
// is returned. When the request completes the updateHostnameAsync callback will be invoked asynchronously.
func (q *rdnsQuerierImpl) GetHostname(ipAddr []byte, updateHostnameSync func(string), updateHostnameAsync func(string, error)) error {
func (q *rdnsQuerierImpl) GetHostnameAsync(ipAddr []byte, updateHostnameSync func(string), updateHostnameAsync func(string, error)) error {
q.internalTelemetry.total.Inc()

netipAddr, ok := netip.AddrFromSlice(ipAddr)
Expand All @@ -167,18 +169,107 @@ func (q *rdnsQuerierImpl) GetHostname(ipAddr []byte, updateHostnameSync func(str
}

if !netipAddr.IsPrivate() {
q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", netipAddr)
q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", ipAddr)
return nil
}
q.internalTelemetry.private.Inc()

err := q.cache.getHostname(netipAddr.String(), updateHostnameSync, updateHostnameAsync)
if err != nil {
q.logger.Debugf("Reverse DNS Enrichment cache.getHostname() for addr %s returned error: %v", netipAddr.String(), err)
return err
}

return nil
return err
}

// GetHostnameSync attempts to resolve the hostname for the given IP address synchronously.
// If the IP address is invalid then an error is returned.
// If the IP address is not in the private address space then it is ignored - no lookup is performed and nil error is returned.
// If the IP address is in the private address space then the IP address will be resolved to a hostname.
// The function accepts a timeout via context and will return an error if the timeout is reached.
func (q *rdnsQuerierImpl) GetHostnameSync(ctx context.Context, ipAddr string) (string, error) {
Copy link
Member

@AlexandreYang AlexandreYang Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocker, but I have the feeling that things could be simpler for a synchronous reverse DNS (with cache).

Maybe the need for chan/mutex/context timeout/callbacks is due to initial implementation being very async oriented (q.GetHostname(...)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that, it seems that for a synchronous reverse DNS with cache, then basic logic could be something like:

  • client request dns entry
  • the reverse DNS library checks if entry is in cache and returns results
  • if not, make actual reverse DNS calls, add results to cache and returns results
    with everything above being synchronous

But instead, we have much complex setup:

  • client request dns entry
  • the reverse DNS library checks if entry is in cache and callback is called
  • if not:
    • push request to a queue (querierImpl.getHostnameAsync)
    • queue is consumed by workers
    • workers make actual reverse DNS calls
    • results are added to cache and callbacks are called
  • callback/chan/mutex are needed in GetHostnameSync to interact with the async GetHostname

Copy link
Member

@AlexandreYang AlexandreYang Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, maybe having a async underlying implementation can be beneficial for this: https://github.com/DataDog/datadog-agent/pull/30002/files#r1814405433

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've thought about that as well. I think there are some benefits to leveraging at least some of the whole workers paradigm.

Consider a large server with Network Traffic Paths enabled. A large number of workers is required to handle this load. With the first implementation described above, we're likely to make many more calls to the DNS server as the parallelism scales up. Depending on the traffic patterns we detect and the user's network topology, this might be more of a cold start problem than a continuous issue.

When @jmw51798, @vicweiss, and I discussed it getting all of that stuff to work with a synchronous implementation seemed like a large effort.

vicweiss marked this conversation as resolved.
Show resolved Hide resolved
results := q.GetHostnames(ctx, []string{ipAddr})
result, ok := results[ipAddr]
if !ok {
return "", fmt.Errorf("no result for IP address %s", ipAddr)
}

return result.Hostname, result.Err
}

// GetHostnames attempts to resolve the hostname for the given IP addresses.
// If the IP address is invalid then an error is returned.
// If the IP address is not in the private address space then it is ignored - no lookup is performed and nil error is returned.
// If the IP address is in the private address space then the IP address will be resolved to a hostname.
// The function accepts a timeout via context and will return an error if the timeout is reached.
func (q *rdnsQuerierImpl) GetHostnames(ctx context.Context, ipAddrs []string) map[string]rdnsquerier.ReverseDNSResult {
q.internalTelemetry.total.Add(float64(len(ipAddrs)))

var wg sync.WaitGroup
resultsChan := make(chan rdnsquerier.ReverseDNSResult, len(ipAddrs))

for _, ipAddr := range ipAddrs {
wg.Add(1)
go func(ipAddr string) {
defer wg.Done()
netipAddr, err := netip.ParseAddr(ipAddr)
if err != nil {
q.internalTelemetry.invalidIPAddress.Inc()
resultsChan <- rdnsquerier.ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("invalid IP address %s: %v", ipAddr, err)}
return
}

if !netipAddr.IsPrivate() {
q.logger.Tracef("Reverse DNS Enrichment IP address %s is not in the private address space", ipAddr)
resultsChan <- rdnsquerier.ReverseDNSResult{IP: ipAddr}
return
}
q.internalTelemetry.private.Inc()

hostnameChan := make(chan string, 1)
asyncErrChan := make(chan error, 1)

err = q.cache.getHostname(
netipAddr.String(),
func(h string) {
hostnameChan <- h
asyncErrChan <- nil
},
func(h string, e error) {
hostnameChan <- h
asyncErrChan <- e
},
)
if err != nil {
q.logger.Debugf("Reverse DNS Enrichment cache.getHostname() for addr %s returned error: %v", netipAddr.String(), err)
}

select {
case hostname := <-hostnameChan:
asyncErr := <-asyncErrChan // this is okay because we know that as soon as we send hostname, we send asyncErr
jmw51798 marked this conversation as resolved.
Show resolved Hide resolved
err = multierr.Append(err, asyncErr)
resultsChan <- rdnsquerier.ReverseDNSResult{
IP: ipAddr,
Hostname: hostname,
Err: err,
}
case <-ctx.Done():
resultsChan <- rdnsquerier.ReverseDNSResult{IP: ipAddr, Err: fmt.Errorf("timeout reached while resolving hostname for IP address %v", ipAddr)}
}
}(ipAddr)
}

go func() {
wg.Wait()
close(resultsChan)
}()

results := make(map[string]rdnsquerier.ReverseDNSResult, len(ipAddrs))
for result := range resultsChan {
results[result.IP] = result
}

return results
}

func (q *rdnsQuerierImpl) start(_ context.Context) error {
Expand Down
Loading
Loading