diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0a81ac17957a..01ee4bac3b8a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - When running under Elastic-Agent the status is now reported per Unit instead of the whole Beat {issue}35874[35874] {pull}36183[36183] - Add warning message to SysV init scripts for RPM-based systems that lack `/etc/rc.d/init.d/functions`. {issue}35708[35708] {pull}36188[36188] - Mark `translate_sid` processor is GA. {issue}36279[36279] {pull}36280[36280] +- dns processor: Add support for forward lookups (`A`, `AAAA`, and `TXT`). {issue}11416[11416] {pull}36394[36394] *Auditbeat* diff --git a/libbeat/processors/dns/cache.go b/libbeat/processors/dns/cache.go index 6e77af028880..6427fb9f98ed 100644 --- a/libbeat/processors/dns/cache.go +++ b/libbeat/processors/dns/cache.go @@ -24,23 +24,23 @@ import ( "github.com/elastic/elastic-agent-libs/monitoring" ) -type ptrRecord struct { - host string +type successRecord struct { + data []string expires time.Time } -func (r ptrRecord) IsExpired(now time.Time) bool { +func (r successRecord) IsExpired(now time.Time) bool { return now.After(r.expires) } -type ptrCache struct { +type successCache struct { sync.RWMutex - data map[string]ptrRecord + data map[string]successRecord maxSize int minSuccessTTL time.Duration } -func (c *ptrCache) set(now time.Time, key string, ptr *PTR) { +func (c *successCache) set(now time.Time, key string, result *result) { c.Lock() defer c.Unlock() @@ -48,14 +48,14 @@ func (c *ptrCache) set(now time.Time, key string, ptr *PTR) { c.evict() } - c.data[key] = ptrRecord{ - host: ptr.Host, - expires: now.Add(time.Duration(ptr.TTL) * time.Second), + c.data[key] = successRecord{ + data: result.Data, + expires: now.Add(time.Duration(result.TTL) * time.Second), } } // evict removes a single random key from the cache. -func (c *ptrCache) evict() { +func (c *successCache) evict() { var key string for k := range c.data { key = k @@ -64,13 +64,13 @@ func (c *ptrCache) evict() { delete(c.data, key) } -func (c *ptrCache) get(now time.Time, key string) *PTR { +func (c *successCache) get(now time.Time, key string) *result { c.RLock() defer c.RUnlock() r, found := c.data[key] if found && !r.IsExpired(now) { - return &PTR{r.host, uint32(r.expires.Sub(now) / time.Second)} + return &result{r.data, uint32(r.expires.Sub(now) / time.Second)} } return nil } @@ -132,13 +132,13 @@ type cachedError struct { func (ce *cachedError) Error() string { return ce.err.Error() + " (from failure cache)" } func (ce *cachedError) Cause() error { return ce.err } -// PTRLookupCache is a cache for storing and retrieving the results of -// reverse DNS queries. It caches the results of queries regardless of their +// lookupCache is a cache for storing and retrieving the results of +// DNS queries. It caches the results of queries regardless of their // outcome (success or failure). -type PTRLookupCache struct { - success *ptrCache +type lookupCache struct { + success *successCache failure *failureCache - resolver PTRResolver + resolver resolver stats cacheStats } @@ -147,15 +147,15 @@ type cacheStats struct { Miss *monitoring.Int } -// NewPTRLookupCache returns a new cache. -func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRResolver) (*PTRLookupCache, error) { +// newLookupCache returns a new cache. +func newLookupCache(reg *monitoring.Registry, conf cacheConfig, resolver resolver) (*lookupCache, error) { if err := conf.Validate(); err != nil { return nil, err } - c := &PTRLookupCache{ - success: &ptrCache{ - data: make(map[string]ptrRecord, conf.SuccessCache.InitialCapacity), + c := &lookupCache{ + success: &successCache{ + data: make(map[string]successRecord, conf.SuccessCache.InitialCapacity), maxSize: conf.SuccessCache.MaxCapacity, minSuccessTTL: conf.SuccessCache.MinTTL, }, @@ -174,36 +174,36 @@ func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRR return c, nil } -// LookupPTR performs a reverse lookup on the given IP address. A cached result +// Lookup performs a lookup on the given query string. A cached result // will be returned if it is contained in the cache, otherwise a lookup is // performed. -func (c PTRLookupCache) LookupPTR(ip string) (*PTR, error) { +func (c lookupCache) Lookup(q string, qt queryType) (*result, error) { now := time.Now() - ptr := c.success.get(now, ip) - if ptr != nil { + r := c.success.get(now, q) + if r != nil { c.stats.Hit.Inc() - return ptr, nil + return r, nil } - err := c.failure.get(now, ip) + err := c.failure.get(now, q) if err != nil { c.stats.Hit.Inc() return nil, err } c.stats.Miss.Inc() - ptr, err = c.resolver.LookupPTR(ip) + r, err = c.resolver.Lookup(q, qt) if err != nil { - c.failure.set(now, ip, &cachedError{err}) + c.failure.set(now, q, &cachedError{err}) return nil, err } - // We set the ptr.TTL to the minimum TTL in case it is less than that. - ptr.TTL = max(ptr.TTL, uint32(c.success.minSuccessTTL/time.Second)) + // We set the result TTL to the minimum TTL in case it is less than that. + r.TTL = max(r.TTL, uint32(c.success.minSuccessTTL/time.Second)) - c.success.set(now, ip, ptr) - return ptr, nil + c.success.set(now, q, r) + return r, nil } func max(a, b uint32) uint32 { diff --git a/libbeat/processors/dns/cache_test.go b/libbeat/processors/dns/cache_test.go index fdc531c54fb7..ffb081d9fcc9 100644 --- a/libbeat/processors/dns/cache_test.go +++ b/libbeat/processors/dns/cache_test.go @@ -29,85 +29,85 @@ import ( type stubResolver struct{} -func (r *stubResolver) LookupPTR(ip string) (*PTR, error) { +func (r *stubResolver) Lookup(ip string, _ queryType) (*result, error) { switch ip { case gatewayIP: - return &PTR{Host: gatewayName, TTL: gatewayTTL}, nil + return &result{Data: []string{gatewayName}, TTL: gatewayTTL}, nil case gatewayIP + "1": return nil, io.ErrUnexpectedEOF case gatewayIP + "2": - return &PTR{Host: gatewayName, TTL: 0}, nil + return &result{Data: []string{gatewayName}, TTL: 0}, nil } return nil, &dnsError{"fake lookup returned NXDOMAIN"} } func TestCache(t *testing.T) { - c, err := NewPTRLookupCache( + c, err := newLookupCache( monitoring.NewRegistry(), - defaultConfig.CacheConfig, + defaultConfig().cacheConfig, &stubResolver{}) if err != nil { t.Fatal(err) } // Initial success query. - ptr, err := c.LookupPTR(gatewayIP) + r, err := c.Lookup(gatewayIP, typePTR) if assert.NoError(t, err) { - assert.EqualValues(t, gatewayName, ptr.Host) - assert.EqualValues(t, gatewayTTL, ptr.TTL) + assert.EqualValues(t, []string{gatewayName}, r.Data) + assert.EqualValues(t, gatewayTTL, r.TTL) assert.EqualValues(t, 0, c.stats.Hit.Get()) assert.EqualValues(t, 1, c.stats.Miss.Get()) } // Cached success query. - ptr, err = c.LookupPTR(gatewayIP) + r, err = c.Lookup(gatewayIP, typePTR) if assert.NoError(t, err) { - assert.EqualValues(t, gatewayName, ptr.Host) + assert.EqualValues(t, []string{gatewayName}, r.Data) // TTL counts down while in cache. - assert.InDelta(t, gatewayTTL, ptr.TTL, 1) + assert.InDelta(t, gatewayTTL, r.TTL, 1) assert.EqualValues(t, 1, c.stats.Hit.Get()) assert.EqualValues(t, 1, c.stats.Miss.Get()) } // Initial failure query (like a dns error response code). - ptr, err = c.LookupPTR(gatewayIP + "0") + r, err = c.Lookup(gatewayIP+"0", typePTR) if assert.Error(t, err) { - assert.Nil(t, ptr) + assert.Nil(t, r) assert.EqualValues(t, 1, c.stats.Hit.Get()) assert.EqualValues(t, 2, c.stats.Miss.Get()) } // Cached failure query. - ptr, err = c.LookupPTR(gatewayIP + "0") + r, err = c.Lookup(gatewayIP+"0", typePTR) if assert.Error(t, err) { - assert.Nil(t, ptr) + assert.Nil(t, r) assert.EqualValues(t, 2, c.stats.Hit.Get()) assert.EqualValues(t, 2, c.stats.Miss.Get()) } // Initial network failure (like I/O timeout). - ptr, err = c.LookupPTR(gatewayIP + "1") + r, err = c.Lookup(gatewayIP+"1", typePTR) if assert.Error(t, err) { - assert.Nil(t, ptr) + assert.Nil(t, r) assert.EqualValues(t, 2, c.stats.Hit.Get()) assert.EqualValues(t, 3, c.stats.Miss.Get()) } // Check for a cache hit for the network failure. - ptr, err = c.LookupPTR(gatewayIP + "1") + r, err = c.Lookup(gatewayIP+"1", typePTR) if assert.Error(t, err) { - assert.Nil(t, ptr) + assert.Nil(t, r) assert.EqualValues(t, 3, c.stats.Hit.Get()) assert.EqualValues(t, 3, c.stats.Miss.Get()) // Cache miss. } - minTTL := defaultConfig.CacheConfig.SuccessCache.MinTTL + minTTL := defaultConfig().cacheConfig.SuccessCache.MinTTL // Initial success returned TTL=0 with MinTTL. - ptr, err = c.LookupPTR(gatewayIP + "2") + r, err = c.Lookup(gatewayIP+"2", typePTR) if assert.NoError(t, err) { - assert.EqualValues(t, gatewayName, ptr.Host) + assert.EqualValues(t, []string{gatewayName}, r.Data) - assert.EqualValues(t, minTTL/time.Second, ptr.TTL) + assert.EqualValues(t, minTTL/time.Second, r.TTL) assert.EqualValues(t, 3, c.stats.Hit.Get()) assert.EqualValues(t, 4, c.stats.Miss.Get()) @@ -117,11 +117,11 @@ func TestCache(t *testing.T) { } // Cached success from a previous TTL=0 response. - ptr, err = c.LookupPTR(gatewayIP + "2") + r, err = c.Lookup(gatewayIP+"2", typePTR) if assert.NoError(t, err) { - assert.EqualValues(t, gatewayName, ptr.Host) + assert.EqualValues(t, []string{gatewayName}, r.Data) // TTL counts down while in cache. - assert.InDelta(t, minTTL/time.Second, ptr.TTL, 1) + assert.InDelta(t, minTTL/time.Second, r.TTL, 1) assert.EqualValues(t, 4, c.stats.Hit.Get()) assert.EqualValues(t, 4, c.stats.Miss.Get()) } diff --git a/libbeat/processors/dns/config.go b/libbeat/processors/dns/config.go index fb8a13eaf216..07aefe5303a6 100644 --- a/libbeat/processors/dns/config.go +++ b/libbeat/processors/dns/config.go @@ -23,38 +23,40 @@ import ( "strings" "time" + "github.com/miekg/dns" + "github.com/elastic/elastic-agent-libs/mapstr" ) -// Config defines the configuration options for the DNS processor. -type Config struct { - CacheConfig `config:",inline"` +// config defines the configuration options for the DNS processor. +type config struct { + cacheConfig `config:",inline"` Nameservers []string `config:"nameservers"` // Required on Windows. /etc/resolv.conf is used if none are given. Timeout time.Duration `config:"timeout"` // Per request timeout (with 2 nameservers the total timeout would be 2x). - Type string `config:"type" validate:"required"` // Reverse is the only supported type currently. - Action FieldAction `config:"action"` // Append or replace (defaults to append) when target exists. + Type queryType `config:"type" validate:"required"` // One of A, AAAA, TXT or PTR (or reverse). + Action fieldAction `config:"action"` // Append or replace (defaults to append) when target exists. TagOnFailure []string `config:"tag_on_failure"` // Tags to append when a failure occurs. Fields mapstr.M `config:"fields"` // Mapping of source fields to target fields. Transport string `config:"transport"` // Can be tls or udp. reverseFlat map[string]string } -// FieldAction defines the behavior when the target field exists. -type FieldAction uint8 +// fieldAction defines the behavior when the target field exists. +type fieldAction uint8 -// List of FieldAction types. +// List of fieldAction types. const ( - ActionAppend FieldAction = iota - ActionReplace + actionAppend fieldAction = iota + actionReplace ) -var fieldActionNames = map[FieldAction]string{ - ActionAppend: "append", - ActionReplace: "replace", +var fieldActionNames = map[fieldAction]string{ + actionAppend: "append", + actionReplace: "replace", } // String returns a field action name. -func (fa FieldAction) String() string { +func (fa fieldAction) String() string { name, found := fieldActionNames[fa] if found { return name @@ -62,27 +64,62 @@ func (fa FieldAction) String() string { return "unknown (" + strconv.Itoa(int(fa)) + ")" } -// Unpack unpacks a string to a FieldAction. -func (fa *FieldAction) Unpack(v string) error { +// Unpack unpacks a string to a fieldAction. +func (fa *fieldAction) Unpack(v string) error { switch strings.ToLower(v) { case "", "append": - *fa = ActionAppend + *fa = actionAppend case "replace": - *fa = ActionReplace + *fa = actionReplace default: return fmt.Errorf("invalid dns field action value '%v'", v) } return nil } -// CacheConfig defines the success and failure caching parameters. -type CacheConfig struct { - SuccessCache CacheSettings `config:"success_cache"` - FailureCache CacheSettings `config:"failure_cache"` +// queryType represents a DNS query type. +type queryType uint16 + +const ( + typePTR = queryType(dns.TypePTR) + typeA = queryType(dns.TypeA) + typeAAAA = queryType(dns.TypeAAAA) + typeTXT = queryType(dns.TypeTXT) +) + +func (qt queryType) String() string { + if name := dns.TypeToString[uint16(qt)]; name != "" { + return name + } + return strconv.FormatUint(uint64(qt), 10) +} + +// Unpack unpacks a string to a queryType. +func (qt *queryType) Unpack(v string) error { + switch strings.ToLower(v) { + case "a": + *qt = typeA + case "aaaa": + *qt = typeAAAA + case "reverse", "ptr": + *qt = typePTR + case "txt": + *qt = typeTXT + default: + return fmt.Errorf("invalid dns lookup type '%s' specified in "+ + "config (valid values are: A, AAAA, PTR, reverse, TXT)", v) + } + return nil +} + +// cacheConfig defines the success and failure caching parameters. +type cacheConfig struct { + SuccessCache cacheSettings `config:"success_cache"` + FailureCache cacheSettings `config:"failure_cache"` } -// CacheSettings define the caching behavior for an individual cache. -type CacheSettings struct { +// cacheSettings define the caching behavior for an individual cache. +type cacheSettings struct { // TTL value for items in cache. Not used for success because we use TTL // from the DNS record. TTL time.Duration `config:"ttl"` @@ -99,16 +136,7 @@ type CacheSettings struct { } // Validate validates the data contained in the config. -func (c *Config) Validate() error { - // Validate lookup type. - c.Type = strings.ToLower(c.Type) - switch c.Type { - case "reverse": - default: - return fmt.Errorf("invalid dns lookup type '%v' specified in "+ - "config (valid values are: reverse)", c.Type) - } - +func (c *config) Validate() error { // Flatten the mapping of source fields to target fields. c.reverseFlat = map[string]string{} for k, v := range c.Fields.Flatten() { @@ -131,8 +159,8 @@ func (c *Config) Validate() error { return nil } -// Validate validates the data contained in the CacheConfig. -func (c *CacheConfig) Validate() error { +// Validate validates the data contained in the cacheConfig. +func (c *cacheConfig) Validate() error { if c.SuccessCache.MinTTL <= 0 { return fmt.Errorf("success_cache.min_ttl must be > 0") } @@ -157,20 +185,22 @@ func (c *CacheConfig) Validate() error { return nil } -var defaultConfig = Config{ - CacheConfig: CacheConfig{ - SuccessCache: CacheSettings{ - MinTTL: time.Minute, - InitialCapacity: 1000, - MaxCapacity: 10000, +func defaultConfig() config { + return config{ + cacheConfig: cacheConfig{ + SuccessCache: cacheSettings{ + MinTTL: time.Minute, + InitialCapacity: 1000, + MaxCapacity: 10000, + }, + FailureCache: cacheSettings{ + MinTTL: time.Minute, + TTL: time.Minute, + InitialCapacity: 1000, + MaxCapacity: 10000, + }, }, - FailureCache: CacheSettings{ - MinTTL: time.Minute, - TTL: time.Minute, - InitialCapacity: 1000, - MaxCapacity: 10000, - }, - }, - Transport: "udp", - Timeout: 500 * time.Millisecond, + Transport: "udp", + Timeout: 500 * time.Millisecond, + } } diff --git a/libbeat/processors/dns/dns.go b/libbeat/processors/dns/dns.go index ee8dd918ebc3..d4f3d2ba57b9 100644 --- a/libbeat/processors/dns/dns.go +++ b/libbeat/processors/dns/dns.go @@ -27,7 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/processors" jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" - "github.com/elastic/elastic-agent-libs/config" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" @@ -44,14 +44,14 @@ func init() { } type processor struct { - Config - resolver PTRResolver + config + resolver resolver log *logp.Logger } // New constructs a new DNS processor. -func New(cfg *config.C) (beat.Processor, error) { - c := defaultConfig +func New(cfg *conf.C) (beat.Processor, error) { + c := defaultConfig() if err := cfg.Unpack(&c); err != nil { return nil, fmt.Errorf("fail to unpack the dns configuration: %w", err) } @@ -64,17 +64,17 @@ func New(cfg *config.C) (beat.Processor, error) { ) log.Debugf("DNS processor config: %+v", c) - resolver, err := NewMiekgResolver(metrics, c.Timeout, c.Transport, c.Nameservers...) + resolver, err := newMiekgResolver(metrics, c.Timeout, c.Transport, c.Nameservers...) if err != nil { return nil, err } - cache, err := NewPTRLookupCache(metrics.NewRegistry("cache"), c.CacheConfig, resolver) + cache, err := newLookupCache(metrics.NewRegistry("cache"), c.cacheConfig, resolver) if err != nil { return nil, err } - return &processor{Config: c, resolver: cache, log: log}, nil + return &processor{config: c, resolver: cache, log: log}, nil } func (p *processor) Run(event *beat.Event) (*beat.Event, error) { @@ -88,32 +88,36 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (p *processor) processField(source, target string, action FieldAction, event *beat.Event) error { +func (p *processor) processField(source, target string, action fieldAction, event *beat.Event) error { v, err := event.GetValue(source) if err != nil { //nolint:nilerr // an empty source field isn't considered an error for this processor return nil } - maybeIP, ok := v.(string) + strVal, ok := v.(string) if !ok { return nil } - ptrRecord, err := p.resolver.LookupPTR(maybeIP) + result, err := p.resolver.Lookup(strVal, p.Type) if err != nil { - return fmt.Errorf("reverse lookup of %v value '%v' failed: %w", source, maybeIP, err) + return fmt.Errorf("dns lookup (%s) of %s value '%s' failed: %w", p.Type, source, strVal, err) } - return setFieldValue(action, event, target, ptrRecord.Host) + // PTR lookups return a scalar. All other lookup types return a string slice. + if p.Type == typePTR { + return setFieldValue(action, event, target, result.Data[0]) + } + return setFieldSliceValue(action, event, target, result.Data) } -func setFieldValue(action FieldAction, event *beat.Event, key string, value string) error { +func setFieldValue(action fieldAction, event *beat.Event, key, value string) error { switch action { - case ActionReplace: + case actionReplace: _, err := event.PutValue(key, value) return err - case ActionAppend: + case actionAppend: old, err := event.PutValue(key, value) if err != nil { return err @@ -129,7 +133,32 @@ func setFieldValue(action FieldAction, event *beat.Event, key string, value stri } return err default: - panic(fmt.Errorf("Unexpected dns field action value encountered: %v", action)) + panic(fmt.Errorf("unexpected dns field action value encountered: %s", action)) + } +} + +func setFieldSliceValue(action fieldAction, event *beat.Event, key string, value []string) error { + switch action { + case actionReplace: + _, err := event.PutValue(key, value) + return err + case actionAppend: + old, err := event.PutValue(key, value) + if err != nil { + return err + } + + if old != nil { + switch v := old.(type) { + case string: + _, err = event.PutValue(key, append([]string{v}, value...)) + case []string: + _, err = event.PutValue(key, append(v, value...)) + } + } + return err + default: + panic(fmt.Errorf("unexpected dns field action value encountered: %s", action)) } } diff --git a/libbeat/processors/dns/dns_test.go b/libbeat/processors/dns/dns_test.go index fa3b2d67f41b..e4571cb4240c 100644 --- a/libbeat/processors/dns/dns_test.go +++ b/libbeat/processors/dns/dns_test.go @@ -32,12 +32,14 @@ import ( ) func TestDNSProcessorRun(t *testing.T) { + c := defaultConfig() + c.Type = typePTR p := &processor{ - Config: defaultConfig, + config: c, resolver: &stubResolver{}, log: logp.NewLogger(logName), } - p.Config.reverseFlat = map[string]string{ + p.config.reverseFlat = map[string]string{ "source.ip": "source.domain", } t.Log(p.String()) @@ -58,7 +60,7 @@ func TestDNSProcessorRun(t *testing.T) { const forwardDomain = "www." + gatewayName t.Run("append", func(t *testing.T) { - p.Config.Action = ActionAppend + p.config.Action = actionAppend event, err := p.Run(&beat.Event{ Fields: mapstr.M{ @@ -77,7 +79,7 @@ func TestDNSProcessorRun(t *testing.T) { }) t.Run("replace", func(t *testing.T) { - p.Config.Action = ActionReplace + p.config.Action = actionReplace event, err := p.Run(&beat.Event{ Fields: mapstr.M{ @@ -94,13 +96,14 @@ func TestDNSProcessorRun(t *testing.T) { }) t.Run("metadata target", func(t *testing.T) { - config := defaultConfig + config := defaultConfig() + config.Type = typePTR config.reverseFlat = map[string]string{ "@metadata.ip": "@metadata.domain", } p := &processor{ - Config: config, + config: config, resolver: &stubResolver{}, log: logp.NewLogger(logName), } @@ -121,17 +124,16 @@ func TestDNSProcessorRun(t *testing.T) { assert.Equal(t, expMeta, newEvent.Meta) assert.Equal(t, event.Fields, newEvent.Fields) }) - } func TestDNSProcessorTagOnFailure(t *testing.T) { p := &processor{ - Config: defaultConfig, + config: defaultConfig(), resolver: &stubResolver{}, log: logp.NewLogger(logName), } - p.Config.TagOnFailure = []string{"_lookup_failed"} - p.Config.reverseFlat = map[string]string{ + p.config.TagOnFailure = []string{"_lookup_failed"} + p.config.reverseFlat = map[string]string{ "source.ip": "source.domain", "destination.ip": "destination.domain", } @@ -149,7 +151,7 @@ func TestDNSProcessorTagOnFailure(t *testing.T) { v, _ := event.GetValue("tags") if assert.Len(t, v, 1) { - assert.ElementsMatch(t, v, p.Config.TagOnFailure) + assert.ElementsMatch(t, v, p.config.TagOnFailure) } } @@ -157,14 +159,14 @@ func TestDNSProcessorRunInParallel(t *testing.T) { // This is a simple smoke test to make sure that there are no concurrency // issues. It is most effective when run with the race detector. - conf := defaultConfig + conf := defaultConfig() reg := monitoring.NewRegistry() - cache, err := NewPTRLookupCache(reg, conf.CacheConfig, &stubResolver{}) + cache, err := newLookupCache(reg, conf.cacheConfig, &stubResolver{}) if err != nil { t.Fatal(err) } - p := &processor{Config: conf, resolver: cache, log: logp.NewLogger(logName)} - p.Config.reverseFlat = map[string]string{"source.ip": "source.domain"} + p := &processor{config: conf, resolver: cache, log: logp.NewLogger(logName)} + p.config.reverseFlat = map[string]string{"source.ip": "source.domain"} const numGoroutines = 10 const numEvents = 500 diff --git a/libbeat/processors/dns/doc.go b/libbeat/processors/dns/doc.go index 8c895b268000..781d7e5284dd 100644 --- a/libbeat/processors/dns/doc.go +++ b/libbeat/processors/dns/doc.go @@ -16,7 +16,7 @@ // under the License. // Package dns implements a processor that can perform DNS lookups by sending -// a DNS request over UDP to a recursive nameserver. Each instance of the +// a DNS request over UDP or TLS to a recursive nameserver. Each instance of the // processor is independent (no shared cache) so it's best to only define one // instance of the processor. // diff --git a/libbeat/processors/dns/docs/dns.asciidoc b/libbeat/processors/dns/docs/dns.asciidoc index 8d03e8b4c0a4..9350b109a858 100644 --- a/libbeat/processors/dns/docs/dns.asciidoc +++ b/libbeat/processors/dns/docs/dns.asciidoc @@ -5,10 +5,10 @@ dns ++++ -The `dns` processor performs reverse DNS lookups of IP addresses. It caches the -responses that it receives in accordance to the time-to-live (TTL) value -contained in the response. It also caches failures that occur during lookups. -Each instance of this processor maintains its own independent cache. +The `dns` processor performs DNS queries. It caches the responses that it +receives in accordance to the time-to-live (TTL) value contained in the +response. It also caches failures that occur during lookups. Each instance +of this processor maintains its own independent cache. The processor uses its own DNS resolver to send requests to nameservers and does not use the operating system's resolver. It does not read any values contained @@ -24,6 +24,16 @@ throughput you can achieve is 500 events per second (1000 milliseconds / 2 milliseconds). If you have a high cache hit ratio then your throughput can be higher. +The processor can send the following query types: + +- `A` - IPv4 addresses +- `AAAA` - IPv6 addresses +- `TXT` - arbitrary human-readable text data +- `PTR` - reverse IP address lookups + +The output value is a list of strings for all query types except `PTR`. For +`PTR` queries the output value is a string. + This is a minimal configuration example that resolves the IP addresses contained in two fields. @@ -33,8 +43,8 @@ processors: - dns: type: reverse fields: - source.ip: source.hostname - destination.ip: destination.hostname + source.ip: source.domain + destination.ip: destination.domain ---- Next is a configuration example showing all options. @@ -47,8 +57,8 @@ processors: action: append transport: tls fields: - server.ip: server.hostname - client.ip: client.hostname + server.ip: server.domain + client.ip: client.domain success_cache: capacity.initial: 1000 capacity.max: 10000 @@ -64,8 +74,8 @@ processors: The `dns` processor has the following configuration settings: -`type`:: The type of DNS lookup to perform. The only supported type is -`reverse` which queries for a PTR record. +`type`:: The type of DNS query to perform. The supported types are `A`, `AAAA`, +`PTR` (or `reverse`), and `TXT`. `action`:: This defines the behavior of the processor when the target field already exists in the event. The options are `append` (default) and `replace`. @@ -82,8 +92,10 @@ the memory for this number of items. Default value is `1000`. cache can hold. When the maximum capacity is reached a random item is evicted. Default value is `10000`. -`success_cache.min_ttl`:: The duration of the minimum alternative cache TTL for successful DNS responses. Ensures that `TTL=0` successful reverse DNS responses can be cached. -Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Default value is `1m`. +`success_cache.min_ttl`:: The duration of the minimum alternative cache TTL for +successful DNS responses. Ensures that `TTL=0` successful reverse DNS responses +can be cached. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". +Default value is `1m`. `failure_cache.capacity.initial`:: The initial number of items that the failure cache will be allocated to hold. When initialized the processor will allocate @@ -107,7 +119,7 @@ for each DNS request so if you have 2 nameservers then the total timeout will be "h". Default value is `500ms`. `tag_on_failure`:: A list of tags to add to the event when any lookup fails. The -tags are only added once even if multiple lookups fail. By default no tags are +tags are only added once even if multiple lookups fail. By default, no tags are added upon failure. `transport`:: The type of transport connection that should be used can either be diff --git a/libbeat/processors/dns/resolver.go b/libbeat/processors/dns/resolver.go index 7e0f160315cb..537051c5a5a3 100644 --- a/libbeat/processors/dns/resolver.go +++ b/libbeat/processors/dns/resolver.go @@ -19,6 +19,7 @@ package dns import ( "errors" + "math" "net" "strconv" "strings" @@ -34,20 +35,20 @@ import ( const etcResolvConf = "/etc/resolv.conf" -// PTR represents a DNS pointer record (IP to hostname). -type PTR struct { - Host string // Hostname. - TTL uint32 // Time to live in seconds. +// result represents a DNS lookup result. +type result struct { + Data []string // Hostname. + TTL uint32 // Time to live in seconds. } -// PTRResolver performs PTR record lookups. -type PTRResolver interface { - LookupPTR(ip string) (*PTR, error) +// resolver performs result record lookups. +type resolver interface { + Lookup(q string, qt queryType) (*result, error) } -// MiekgResolver is a PTRResolver that is implemented using github.com/miekg/dns +// miekgResolver is a resolver that is implemented using github.com/miekg/dns // to send requests to DNS servers. It does not use the Go resolver. -type MiekgResolver struct { +type miekgResolver struct { client *dns.Client servers []string @@ -57,14 +58,14 @@ type MiekgResolver struct { } type nameserverStats struct { - success *monitoring.Int // Number of responses from server. - failure *monitoring.Int // Number of failures (e.g. I/O timeout) (not NXDOMAIN). - ptrResponse metrics.Sample // Histogram of response times. + success *monitoring.Int // Number of responses from server. + failure *monitoring.Int // Number of failures (e.g. I/O timeout) (not NXDOMAIN). + requestDuration metrics.Sample // Histogram of response times. } -// NewMiekgResolver returns a new MiekgResolver. It returns an error if no +// newMiekgResolver returns a new miekgResolver. It returns an error if no // nameserver are given and none can be read from /etc/resolv.conf. -func NewMiekgResolver(reg *monitoring.Registry, timeout time.Duration, transport string, servers ...string) (*MiekgResolver, error) { +func newMiekgResolver(reg *monitoring.Registry, timeout time.Duration, transport string, servers ...string) (*miekgResolver, error) { // Use /etc/resolv.conf if no nameservers are given. (Won't work for Windows). if len(servers) == 0 { config, err := dns.ClientConfigFromFile(etcResolvConf) @@ -94,7 +95,7 @@ func NewMiekgResolver(reg *monitoring.Registry, timeout time.Duration, transport } if timeout == 0 { - timeout = defaultConfig.Timeout + timeout = defaultConfig().Timeout } var clientTransferType string @@ -105,7 +106,7 @@ func NewMiekgResolver(reg *monitoring.Registry, timeout time.Duration, transport clientTransferType = "udp" } - return &MiekgResolver{ + return &miekgResolver{ client: &dns.Client{ Net: clientTransferType, Timeout: timeout, @@ -129,37 +130,42 @@ func (e *dnsError) Error() string { return "dns: " + e.err } -// LookupPTR performs a reverse lookup on the given IP address. -func (res *MiekgResolver) LookupPTR(ip string) (*PTR, error) { +// Lookup performs a DNS query. +func (res *miekgResolver) Lookup(q string, qt queryType) (*result, error) { if len(res.servers) == 0 { return nil, errors.New("no dns servers configured") } - // Create PTR (reverse) DNS request. + // Create DNS request. m := new(dns.Msg) - arpa, err := dns.ReverseAddr(ip) - if err != nil { - return nil, err + switch qt { + case typePTR: + arpa, err := dns.ReverseAddr(q) + if err != nil { + return nil, err + } + m.SetQuestion(arpa, dns.TypePTR) + case typeA, typeAAAA, typeTXT: + m.SetQuestion(dns.Fqdn(q), uint16(qt)) } - m.SetQuestion(arpa, dns.TypePTR) m.RecursionDesired = true // Try the nameservers until we get a response. - var rtnErr error + var nameserverErr error for _, server := range res.servers { stats := res.getOrCreateNameserverStats(server) r, rtt, err := res.client.Exchange(m, server) if err != nil { - // Try next server if any. Otherwise return retErr. - rtnErr = err + // Try next server if any. Otherwise, return nameserverErr. + nameserverErr = err stats.failure.Inc() continue } // We got a response. stats.success.Inc() - stats.ptrResponse.Update(int64(rtt)) + stats.requestDuration.Update(int64(rtt)) if r.Rcode != dns.RcodeSuccess { name, found := dns.RcodeToString[r.Rcode] if !found { @@ -168,27 +174,48 @@ func (res *MiekgResolver) LookupPTR(ip string) (*PTR, error) { return nil, &dnsError{"nameserver " + server + " returned " + name} } + var rtn result + rtn.TTL = math.MaxUint32 for _, a := range r.Answer { - if ptr, ok := a.(*dns.PTR); ok { - return &PTR{ - Host: strings.TrimSuffix(ptr.Ptr, "."), - TTL: ptr.Hdr.Ttl, + // Ignore records that don't match the query type. + if a.Header().Rrtype != uint16(qt) { + continue + } + + switch rr := a.(type) { + case *dns.PTR: + return &result{ + Data: []string{strings.TrimSuffix(rr.Ptr, ".")}, + TTL: rr.Hdr.Ttl, }, nil + case *dns.A: + rtn.Data = append(rtn.Data, rr.A.String()) + rtn.TTL = min(rtn.TTL, rr.Hdr.Ttl) + case *dns.AAAA: + rtn.Data = append(rtn.Data, rr.AAAA.String()) + rtn.TTL = min(rtn.TTL, rr.Hdr.Ttl) + case *dns.TXT: + rtn.Data = append(rtn.Data, rr.Txt...) + rtn.TTL = min(rtn.TTL, rr.Hdr.Ttl) } } - return nil, &dnsError{"no PTR record was found in the response"} + if len(rtn.Data) == 0 { + return nil, &dnsError{"no " + qt.String() + " resource records were found in the response"} + } + + return &rtn, nil } - if rtnErr != nil { - return nil, rtnErr + if nameserverErr != nil { + return nil, nameserverErr } // This should never get here. - panic("LookupPTR should have returned a response.") + panic("dns resolver Lookup() should have returned a response.") } -func (res *MiekgResolver) getOrCreateNameserverStats(ns string) *nameserverStats { +func (res *miekgResolver) getOrCreateNameserverStats(ns string) *nameserverStats { // Trim port. ns = ns[:strings.LastIndex(ns, ":")] @@ -212,13 +239,22 @@ func (res *MiekgResolver) getOrCreateNameserverStats(ns string) *nameserverStats // Create stats for the nameserver. reg := res.registry.NewRegistry(strings.Replace(ns, ".", "_", -1)) stats = &nameserverStats{ - success: monitoring.NewInt(reg, "success"), - failure: monitoring.NewInt(reg, "failure"), - ptrResponse: metrics.NewUniformSample(1028), + success: monitoring.NewInt(reg, "success"), + failure: monitoring.NewInt(reg, "failure"), + requestDuration: metrics.NewUniformSample(1028), } - adapter.NewGoMetrics(reg, "response.ptr", adapter.Accept). - Register("histogram", metrics.NewHistogram(stats.ptrResponse)) + + //nolint:errcheck // Register should never fail because this is a new empty registry. + adapter.NewGoMetrics(reg, "request_duration", adapter.Accept). + Register("histogram", metrics.NewHistogram(stats.requestDuration)) res.nsStats[ns] = stats return stats } + +func min(a, b uint32) uint32 { + if a < b { + return a + } + return b +} diff --git a/libbeat/processors/dns/resolver_test.go b/libbeat/processors/dns/resolver_test.go index 1e2e56b86282..1786883a671c 100644 --- a/libbeat/processors/dns/resolver_test.go +++ b/libbeat/processors/dns/resolver_test.go @@ -19,41 +19,45 @@ package dns import ( "crypto/tls" + "errors" "net" "strings" "testing" "github.com/miekg/dns" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/monitoring" ) -var _ PTRResolver = (*MiekgResolver)(nil) +var _ resolver = (*miekgResolver)(nil) func TestMiekgResolverLookupPTR(t *testing.T) { - stop, addr, err := ServeDNS(FakeDNSHandler) + stop, addr, err := serveDNS(fakeDNSHandler) if err != nil { t.Fatal(err) } - defer stop() + defer func() { + require.NoError(t, stop()) + }() reg := monitoring.NewRegistry() - res, err := NewMiekgResolver(reg.NewRegistry(logName), 0, "udp", addr) + res, err := newMiekgResolver(reg.NewRegistry(logName), 0, "udp", addr) if err != nil { t.Fatal(err) } // Success - ptr, err := res.LookupPTR("8.8.8.8") + ptr, err := res.Lookup("8.8.8.8", typePTR) if err != nil { t.Fatal(err) } - assert.EqualValues(t, "google-public-dns-a.google.com", ptr.Host) + assert.EqualValues(t, "google-public-dns-a.google.com", ptr.Data[0]) assert.EqualValues(t, 19273, ptr.TTL) // NXDOMAIN - _, err = res.LookupPTR("1.1.1.1") + _, err = res.Lookup("1.1.1.1", typePTR) if assert.Error(t, err) { assert.Contains(t, err.Error(), "NXDOMAIN") } @@ -70,42 +74,45 @@ func TestMiekgResolverLookupPTR(t *testing.T) { } func TestMiekgResolverLookupPTRTLS(t *testing.T) { - //Build Cert - cert, err := tls.X509KeyPair(CertPEMBlock, KeyPEMBlock) + // Build Cert + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) if err != nil { t.Fatalf("unable to build certificate: %v", err) } config := tls.Config{ Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS13, } // serve TLS with cert - stop, addr, err := ServeDNSTLS(FakeDNSHandler, &config) + stop, addr, err := serveDNSTLS(fakeDNSHandler, &config) if err != nil { t.Fatal(err) } - defer stop() + defer func() { + require.NoError(t, stop()) + }() reg := monitoring.NewRegistry() - res, err := NewMiekgResolver(reg.NewRegistry(logName), 0, "tls", addr) + res, err := newMiekgResolver(reg.NewRegistry(logName), 0, "tls", addr) if err != nil { t.Fatal(err) } - // we use a self signed certificate for localhost - // we have to pass InsecureSSL to the DNS resolver + //nolint:gosec // Don't verify the self-signed cert. This is only for testing purposes. res.client.TLSConfig = &tls.Config{ InsecureSkipVerify: true, } + // Success - ptr, err := res.LookupPTR("8.8.8.8") + ptr, err := res.Lookup("8.8.8.8", typePTR) if err != nil { t.Fatal(err) } - assert.EqualValues(t, "google-public-dns-a.google.com", ptr.Host) + assert.EqualValues(t, "google-public-dns-a.google.com", ptr.Data[0]) assert.EqualValues(t, 19273, ptr.TTL) // NXDOMAIN - _, err = res.LookupPTR("1.1.1.1") + _, err = res.Lookup("1.1.1.1", typePTR) if assert.Error(t, err) { assert.Contains(t, err.Error(), "NXDOMAIN") } @@ -121,7 +128,7 @@ func TestMiekgResolverLookupPTRTLS(t *testing.T) { assert.Equal(t, 12, metricCount) } -func ServeDNS(h dns.HandlerFunc) (cancel func() error, addr string, err error) { +func serveDNS(h dns.HandlerFunc) (cancel func() error, addr string, err error) { // Setup listener on ephemeral port. a, err := net.ResolveUDPAddr("udp4", "localhost:0") @@ -136,11 +143,23 @@ func ServeDNS(h dns.HandlerFunc) (cancel func() error, addr string, err error) { var s dns.Server s.PacketConn = l s.Handler = h - go s.ActivateAndServe() - return s.Shutdown, s.PacketConn.LocalAddr().String(), err + + serveErr := make(chan error, 1) + go func() { + defer close(serveErr) + serveErr <- s.ActivateAndServe() + }() + + cancel = func() error { + return errors.Join( + s.Shutdown(), + <-serveErr, + ) + } + return cancel, s.PacketConn.LocalAddr().String(), err } -func ServeDNSTLS(h dns.HandlerFunc, config *tls.Config) (cancel func() error, addr string, err error) { +func serveDNSTLS(h dns.HandlerFunc, config *tls.Config) (cancel func() error, addr string, err error) { // Setup listener on ephemeral port. l, err := tls.Listen("tcp", "localhost:0", config) if err != nil { @@ -150,11 +169,23 @@ func ServeDNSTLS(h dns.HandlerFunc, config *tls.Config) (cancel func() error, ad var s dns.Server s.Handler = h s.Listener = l - go s.ActivateAndServe() - return s.Shutdown, l.Addr().String(), err + + serveErr := make(chan error, 1) + go func() { + defer close(serveErr) + serveErr <- s.ActivateAndServe() + }() + + cancel = func() error { + return errors.Join( + s.Shutdown(), + <-serveErr, + ) + } + return cancel, l.Addr().String(), err } -func FakeDNSHandler(w dns.ResponseWriter, msg *dns.Msg) { +func fakeDNSHandler(w dns.ResponseWriter, msg *dns.Msg) { m := new(dns.Msg) m.SetReply(msg) switch { @@ -164,11 +195,11 @@ func FakeDNSHandler(w dns.ResponseWriter, msg *dns.Msg) { default: m.SetRcode(msg, dns.RcodeNameError) } - w.WriteMsg(m) + _ = w.WriteMsg(m) } var ( - KeyPEMBlock = []byte(`-----BEGIN RSA PRIVATE KEY----- + keyPEMBlock = []byte(`-----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEA2g2zpEtWaIUx5o6MEnWnGsf0Ba1SDc3AwgOmxeNIPBJYVCrk sWe8Qt/5nymReVFcum76995ncr/zT+e4e8l+hXuGzTKZJpOj27Igb0/wa3j2hIcu rnbzfwkJ+KMag2UUKdSo31ChMU+64bwziEXunF347Ot7dBLtw3PJKbabNCP+/oil @@ -196,7 +227,7 @@ LatVl7h6ud25ZJYnP7DelGxHsZnDXNirLFlSB0CL4F6I5xNoBvCoH0Q8ckDSh4C7 tlAyD5m9gwvgdkNFWq6/lcUPxGksTtTk8dGnhJz8pGlZvp6+dZCM -----END RSA PRIVATE KEY-----`) - CertPEMBlock = []byte(`-----BEGIN CERTIFICATE----- + certPEMBlock = []byte(`-----BEGIN CERTIFICATE----- MIIDaTCCAlGgAwIBAgIQGqg47wLgbjwwrZASuakmwjANBgkqhkiG9w0BAQsFADAy MRQwEgYDVQQKEwtMb2cgQ291cmllcjEaMBgGA1UEAxMRYmVhdHMuZWxhc3RpYy5j b20wHhcNMjAwNjIzMDY0NDEwWhcNMjEwNjIzMDY0NDEwWjAyMRQwEgYDVQQKEwtM