Skip to content

Commit

Permalink
dns processor - Add A, AAAA, and TXT query support (elastic#36394)
Browse files Browse the repository at this point in the history
The dns processor previously supported only reverse DNS lookups.
This adds support for performing A, AAAA, and TXT record queries.

The response.ptr.histogram metric was renamed to request_duration.histogram.
This naming allows the metric to represent the duration of the DNS request
for all query types.

Some refactoring was done to unexport types/functions that should have been
internal only.

Closes elastic#11416

Co-authored-by: Dan Kortschak <[email protected]>
  • Loading branch information
2 people authored and Scholar-Li committed Feb 5, 2024
1 parent b75ca03 commit 883e03c
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 225 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- When running under Elastic-Agent the status is now reported per Unit instead of the whole Beat {issue}35874[35874] {pull}36183[36183]
- Add warning message to SysV init scripts for RPM-based systems that lack `/etc/rc.d/init.d/functions`. {issue}35708[35708] {pull}36188[36188]
- Mark `translate_sid` processor is GA. {issue}36279[36279] {pull}36280[36280]
- dns processor: Add support for forward lookups (`A`, `AAAA`, and `TXT`). {issue}11416[11416] {pull}36394[36394]

*Auditbeat*

Expand Down
68 changes: 34 additions & 34 deletions libbeat/processors/dns/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,38 @@ import (
"github.com/elastic/elastic-agent-libs/monitoring"
)

type ptrRecord struct {
host string
type successRecord struct {
data []string
expires time.Time
}

func (r ptrRecord) IsExpired(now time.Time) bool {
func (r successRecord) IsExpired(now time.Time) bool {
return now.After(r.expires)
}

type ptrCache struct {
type successCache struct {
sync.RWMutex
data map[string]ptrRecord
data map[string]successRecord
maxSize int
minSuccessTTL time.Duration
}

func (c *ptrCache) set(now time.Time, key string, ptr *PTR) {
func (c *successCache) set(now time.Time, key string, result *result) {
c.Lock()
defer c.Unlock()

if len(c.data) >= c.maxSize {
c.evict()
}

c.data[key] = ptrRecord{
host: ptr.Host,
expires: now.Add(time.Duration(ptr.TTL) * time.Second),
c.data[key] = successRecord{
data: result.Data,
expires: now.Add(time.Duration(result.TTL) * time.Second),
}
}

// evict removes a single random key from the cache.
func (c *ptrCache) evict() {
func (c *successCache) evict() {
var key string
for k := range c.data {
key = k
Expand All @@ -64,13 +64,13 @@ func (c *ptrCache) evict() {
delete(c.data, key)
}

func (c *ptrCache) get(now time.Time, key string) *PTR {
func (c *successCache) get(now time.Time, key string) *result {
c.RLock()
defer c.RUnlock()

r, found := c.data[key]
if found && !r.IsExpired(now) {
return &PTR{r.host, uint32(r.expires.Sub(now) / time.Second)}
return &result{r.data, uint32(r.expires.Sub(now) / time.Second)}
}
return nil
}
Expand Down Expand Up @@ -132,13 +132,13 @@ type cachedError struct {
func (ce *cachedError) Error() string { return ce.err.Error() + " (from failure cache)" }
func (ce *cachedError) Cause() error { return ce.err }

// PTRLookupCache is a cache for storing and retrieving the results of
// reverse DNS queries. It caches the results of queries regardless of their
// lookupCache is a cache for storing and retrieving the results of
// DNS queries. It caches the results of queries regardless of their
// outcome (success or failure).
type PTRLookupCache struct {
success *ptrCache
type lookupCache struct {
success *successCache
failure *failureCache
resolver PTRResolver
resolver resolver
stats cacheStats
}

Expand All @@ -147,15 +147,15 @@ type cacheStats struct {
Miss *monitoring.Int
}

// NewPTRLookupCache returns a new cache.
func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRResolver) (*PTRLookupCache, error) {
// newLookupCache returns a new cache.
func newLookupCache(reg *monitoring.Registry, conf cacheConfig, resolver resolver) (*lookupCache, error) {
if err := conf.Validate(); err != nil {
return nil, err
}

c := &PTRLookupCache{
success: &ptrCache{
data: make(map[string]ptrRecord, conf.SuccessCache.InitialCapacity),
c := &lookupCache{
success: &successCache{
data: make(map[string]successRecord, conf.SuccessCache.InitialCapacity),
maxSize: conf.SuccessCache.MaxCapacity,
minSuccessTTL: conf.SuccessCache.MinTTL,
},
Expand All @@ -174,36 +174,36 @@ func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRR
return c, nil
}

// LookupPTR performs a reverse lookup on the given IP address. A cached result
// Lookup performs a lookup on the given query string. A cached result
// will be returned if it is contained in the cache, otherwise a lookup is
// performed.
func (c PTRLookupCache) LookupPTR(ip string) (*PTR, error) {
func (c lookupCache) Lookup(q string, qt queryType) (*result, error) {
now := time.Now()

ptr := c.success.get(now, ip)
if ptr != nil {
r := c.success.get(now, q)
if r != nil {
c.stats.Hit.Inc()
return ptr, nil
return r, nil
}

err := c.failure.get(now, ip)
err := c.failure.get(now, q)
if err != nil {
c.stats.Hit.Inc()
return nil, err
}
c.stats.Miss.Inc()

ptr, err = c.resolver.LookupPTR(ip)
r, err = c.resolver.Lookup(q, qt)
if err != nil {
c.failure.set(now, ip, &cachedError{err})
c.failure.set(now, q, &cachedError{err})
return nil, err
}

// We set the ptr.TTL to the minimum TTL in case it is less than that.
ptr.TTL = max(ptr.TTL, uint32(c.success.minSuccessTTL/time.Second))
// We set the result TTL to the minimum TTL in case it is less than that.
r.TTL = max(r.TTL, uint32(c.success.minSuccessTTL/time.Second))

c.success.set(now, ip, ptr)
return ptr, nil
c.success.set(now, q, r)
return r, nil
}

func max(a, b uint32) uint32 {
Expand Down
52 changes: 26 additions & 26 deletions libbeat/processors/dns/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,85 +29,85 @@ import (

type stubResolver struct{}

func (r *stubResolver) LookupPTR(ip string) (*PTR, error) {
func (r *stubResolver) Lookup(ip string, _ queryType) (*result, error) {
switch ip {
case gatewayIP:
return &PTR{Host: gatewayName, TTL: gatewayTTL}, nil
return &result{Data: []string{gatewayName}, TTL: gatewayTTL}, nil
case gatewayIP + "1":
return nil, io.ErrUnexpectedEOF
case gatewayIP + "2":
return &PTR{Host: gatewayName, TTL: 0}, nil
return &result{Data: []string{gatewayName}, TTL: 0}, nil
}
return nil, &dnsError{"fake lookup returned NXDOMAIN"}
}

func TestCache(t *testing.T) {
c, err := NewPTRLookupCache(
c, err := newLookupCache(
monitoring.NewRegistry(),
defaultConfig.CacheConfig,
defaultConfig().cacheConfig,
&stubResolver{})
if err != nil {
t.Fatal(err)
}

// Initial success query.
ptr, err := c.LookupPTR(gatewayIP)
r, err := c.Lookup(gatewayIP, typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, gatewayTTL, ptr.TTL)
assert.EqualValues(t, []string{gatewayName}, r.Data)
assert.EqualValues(t, gatewayTTL, r.TTL)
assert.EqualValues(t, 0, c.stats.Hit.Get())
assert.EqualValues(t, 1, c.stats.Miss.Get())
}

// Cached success query.
ptr, err = c.LookupPTR(gatewayIP)
r, err = c.Lookup(gatewayIP, typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, []string{gatewayName}, r.Data)
// TTL counts down while in cache.
assert.InDelta(t, gatewayTTL, ptr.TTL, 1)
assert.InDelta(t, gatewayTTL, r.TTL, 1)
assert.EqualValues(t, 1, c.stats.Hit.Get())
assert.EqualValues(t, 1, c.stats.Miss.Get())
}

// Initial failure query (like a dns error response code).
ptr, err = c.LookupPTR(gatewayIP + "0")
r, err = c.Lookup(gatewayIP+"0", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 1, c.stats.Hit.Get())
assert.EqualValues(t, 2, c.stats.Miss.Get())
}

// Cached failure query.
ptr, err = c.LookupPTR(gatewayIP + "0")
r, err = c.Lookup(gatewayIP+"0", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 2, c.stats.Hit.Get())
assert.EqualValues(t, 2, c.stats.Miss.Get())
}

// Initial network failure (like I/O timeout).
ptr, err = c.LookupPTR(gatewayIP + "1")
r, err = c.Lookup(gatewayIP+"1", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 2, c.stats.Hit.Get())
assert.EqualValues(t, 3, c.stats.Miss.Get())
}

// Check for a cache hit for the network failure.
ptr, err = c.LookupPTR(gatewayIP + "1")
r, err = c.Lookup(gatewayIP+"1", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 3, c.stats.Hit.Get())
assert.EqualValues(t, 3, c.stats.Miss.Get()) // Cache miss.
}

minTTL := defaultConfig.CacheConfig.SuccessCache.MinTTL
minTTL := defaultConfig().cacheConfig.SuccessCache.MinTTL
// Initial success returned TTL=0 with MinTTL.
ptr, err = c.LookupPTR(gatewayIP + "2")
r, err = c.Lookup(gatewayIP+"2", typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, []string{gatewayName}, r.Data)

assert.EqualValues(t, minTTL/time.Second, ptr.TTL)
assert.EqualValues(t, minTTL/time.Second, r.TTL)
assert.EqualValues(t, 3, c.stats.Hit.Get())
assert.EqualValues(t, 4, c.stats.Miss.Get())

Expand All @@ -117,11 +117,11 @@ func TestCache(t *testing.T) {
}

// Cached success from a previous TTL=0 response.
ptr, err = c.LookupPTR(gatewayIP + "2")
r, err = c.Lookup(gatewayIP+"2", typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, []string{gatewayName}, r.Data)
// TTL counts down while in cache.
assert.InDelta(t, minTTL/time.Second, ptr.TTL, 1)
assert.InDelta(t, minTTL/time.Second, r.TTL, 1)
assert.EqualValues(t, 4, c.stats.Hit.Get())
assert.EqualValues(t, 4, c.stats.Miss.Get())
}
Expand Down
Loading

0 comments on commit 883e03c

Please sign in to comment.