From 8b85a6967c9cef56302bf1953405dc3cef0ae2d7 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek <58699848+sumo-drosiek@users.noreply.github.com> Date: Thu, 13 May 2021 18:58:33 +0200 Subject: [PATCH] Add missing network connection attributes to tcp/udp (#134) * Add IP address resolver helper Signed-off-by: Dominik Rosiek * tcp: use ip address resolver Signed-off-by: Dominik Rosiek * udp: use ip address resolver Signed-off-by: Dominik Rosiek * tcp: docs: update add_attributes description Signed-off-by: Dominik Rosiek * udp: docs: update add_attributes description Signed-off-by: Dominik Rosiek * Add cached IPResolver Signed-off-by: Dominik Rosiek * Initialize IPResolver every time Signed-off-by: Dominik Rosiek * docs: polish markdown for Signed-off-by: Dominik Rosiek * udp: tcp: conditionally initialize resolver Signed-off-by: Dominik Rosiek * Add tests for ip resolver Signed-off-by: Dominik Rosiek * Get rid of race in ip_resolver test Signed-off-by: Dominik Rosiek --- docs/operators/tcp_input.md | 2 +- docs/operators/udp_input.md | 2 +- operator/builtin/input/tcp/tcp.go | 16 ++- operator/builtin/input/tcp/tcp_test.go | 6 +- operator/builtin/input/udp/udp.go | 16 ++- operator/builtin/input/udp/udp_test.go | 6 +- operator/helper/ip_resolver.go | 132 +++++++++++++++++++++++++ operator/helper/ip_resolver_test.go | 75 ++++++++++++++ 8 files changed, 249 insertions(+), 6 deletions(-) create mode 100644 operator/helper/ip_resolver.go create mode 100644 operator/helper/ip_resolver_test.go diff --git a/docs/operators/tcp_input.md b/docs/operators/tcp_input.md index d5dce2115724..ebca99111107 100644 --- a/docs/operators/tcp_input.md +++ b/docs/operators/tcp_input.md @@ -14,7 +14,7 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op | `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry | | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource | -| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes | +| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] | | `multiline` | | A `multiline` configuration block. See below for details | | `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options | diff --git a/docs/operators/udp_input.md b/docs/operators/udp_input.md index f6ad7abb463a..fa0dc58af11a 100644 --- a/docs/operators/udp_input.md +++ b/docs/operators/udp_input.md @@ -12,7 +12,7 @@ The `udp_input` operator listens for logs from UDP packets. | `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry | | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource | -| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes | +| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] | | `multiline` | | A `multiline` configuration block. See below for details | | `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options | diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index dac512944e22..952c29c7fc4b 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -103,6 +103,11 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } + var resolver *helper.IPResolver = nil + if c.AddAttributes { + resolver = helper.NewIpResolver() + } + tcpInput := &TCPInput{ InputOperator: inputOperator, address: c.ListenAddress, @@ -113,6 +118,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato backoff: backoff.Backoff{ Max: 3 * time.Second, }, + resolver: resolver, } if c.TLS != nil { @@ -140,6 +146,7 @@ type TCPInput struct { encoding helper.Encoding splitFunc bufio.SplitFunc + resolver *helper.IPResolver } // Start will start listening for log entries over tcp. @@ -249,13 +256,17 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c if t.addAttributes { entry.AddAttribute("net.transport", "IP.TCP") if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { - entry.AddAttribute("net.peer.ip", addr.IP.String()) + ip := addr.IP.String() + entry.AddAttribute("net.peer.ip", ip) entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) + entry.AddAttribute("net.peer.name", t.resolver.GetHostFromIp(ip)) } if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok { + ip := addr.IP.String() entry.AddAttribute("net.host.ip", addr.IP.String()) entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) + entry.AddAttribute("net.host.name", t.resolver.GetHostFromIp(ip)) } } @@ -276,5 +287,8 @@ func (t *TCPInput) Stop() error { } t.wg.Wait() + if t.resolver != nil { + t.resolver.Stop() + } return nil } diff --git a/operator/builtin/input/tcp/tcp_test.go b/operator/builtin/input/tcp/tcp_test.go index 9bbec3ed8e26..2e2cd1bd8732 100644 --- a/operator/builtin/input/tcp/tcp_test.go +++ b/operator/builtin/input/tcp/tcp_test.go @@ -168,12 +168,16 @@ func tcpInputAttributesTest(input []byte, expected []string) func(t *testing.T) "net.transport": "IP.TCP", } if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { + ip := addr.IP.String() expectedAttributes["net.host.ip"] = addr.IP.String() expectedAttributes["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10) + expectedAttributes["net.host.name"] = tcpInput.resolver.GetHostFromIp(ip) } if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok { - expectedAttributes["net.peer.ip"] = addr.IP.String() + ip := addr.IP.String() + expectedAttributes["net.peer.ip"] = ip expectedAttributes["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10) + expectedAttributes["net.peer.name"] = tcpInput.resolver.GetHostFromIp(ip) } require.Equal(t, expectedMessage, entry.Body) require.Equal(t, expectedAttributes, entry.Attributes) diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index 72c76d6bc95e..39c209c7e13b 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -86,6 +86,11 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } + var resolver *helper.IPResolver = nil + if c.AddAttributes { + resolver = helper.NewIpResolver() + } + udpInput := &UDPInput{ InputOperator: inputOperator, address: address, @@ -93,6 +98,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato addAttributes: c.AddAttributes, encoding: encoding, splitFunc: splitFunc, + resolver: resolver, } return []operator.Operator{udpInput}, nil } @@ -110,6 +116,7 @@ type UDPInput struct { encoding helper.Encoding splitFunc bufio.SplitFunc + resolver *helper.IPResolver } // Start will start listening for messages on a socket. @@ -168,13 +175,17 @@ func (u *UDPInput) goHandleMessages(ctx context.Context) { if u.addAttributes { entry.AddAttribute("net.transport", "IP.UDP") if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok { + ip := addr.IP.String() entry.AddAttribute("net.host.ip", addr.IP.String()) entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) + entry.AddAttribute("net.host.name", u.resolver.GetHostFromIp(ip)) } if addr, ok := remoteAddr.(*net.UDPAddr); ok { - entry.AddAttribute("net.peer.ip", addr.IP.String()) + ip := addr.IP.String() + entry.AddAttribute("net.peer.ip", ip) entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) + entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIp(ip)) } } @@ -206,5 +217,8 @@ func (u *UDPInput) Stop() error { u.cancel() u.connection.Close() u.wg.Wait() + if u.resolver != nil { + u.resolver.Stop() + } return nil } diff --git a/operator/builtin/input/udp/udp_test.go b/operator/builtin/input/udp/udp_test.go index 8e854b8f98f1..4746c2df60ee 100644 --- a/operator/builtin/input/udp/udp_test.go +++ b/operator/builtin/input/udp/udp_test.go @@ -117,13 +117,17 @@ func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T) } // LocalAddr for udpInput.connection is a server address if addr, ok := udpInput.connection.LocalAddr().(*net.UDPAddr); ok { + ip := addr.IP.String() expectedAttributes["net.host.ip"] = addr.IP.String() expectedAttributes["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10) + expectedAttributes["net.host.name"] = udpInput.resolver.GetHostFromIp(ip) } // LocalAddr for conn is a client (peer) address if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { - expectedAttributes["net.peer.ip"] = addr.IP.String() + ip := addr.IP.String() + expectedAttributes["net.peer.ip"] = ip expectedAttributes["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10) + expectedAttributes["net.peer.name"] = udpInput.resolver.GetHostFromIp(ip) } require.Equal(t, expectedBody, entry.Body) require.Equal(t, expectedAttributes, entry.Attributes) diff --git a/operator/helper/ip_resolver.go b/operator/helper/ip_resolver.go new file mode 100644 index 000000000000..1ac1762ecbd3 --- /dev/null +++ b/operator/helper/ip_resolver.go @@ -0,0 +1,132 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "net" + "sync" + "time" +) + +// cacheEntry keeps information about host and expiration time +type cacheEntry struct { + hostname string + expireTime time.Time +} + +const ( + defaultInvalidationInterval time.Duration = 5 * time.Minute +) + +type IPResolver struct { + cache map[string]cacheEntry + mutex sync.RWMutex + done chan bool + stopped bool + invalidationInterval time.Duration +} + +// Create new resolver +func NewIpResolver() *IPResolver { + r := &IPResolver{ + cache: make(map[string]cacheEntry), + stopped: false, + done: make(chan bool), + invalidationInterval: defaultInvalidationInterval, + } + r.start() + return r +} + +// Stop cache invalidation +func (r *IPResolver) Stop() { + r.mutex.Lock() + if r.stopped { + r.mutex.Unlock() + return + } + + r.stopped = true + r.mutex.Unlock() + r.done <- true +} + +// start runs cache invalidation every 5 minutes +func (r *IPResolver) start() { + ticker := time.NewTicker(r.invalidationInterval) + go func() { + for { + select { + case <-r.done: + ticker.Stop() + return + case <-ticker.C: + r.mutex.Lock() + r.invalidateCache() + r.mutex.Unlock() + } + } + }() +} + +// invalidateCache removes not longer valid entries from cache +func (r *IPResolver) invalidateCache() { + now := time.Now() + for key, entry := range r.cache { + if entry.expireTime.Before(now) { + delete(r.cache, key) + } + } +} + +// GetHostFromIp returns hostname for given ip +// It is taken from cache if exists, +// otherwise lookup is performed and result is put into cache +func (r *IPResolver) GetHostFromIp(ip string) (host string) { + r.mutex.RLock() + entry, ok := r.cache[ip] + if ok { + host = entry.hostname + defer r.mutex.RUnlock() + return host + } + r.mutex.RUnlock() + + host = r.lookupIpAddr(ip) + + r.mutex.Lock() + r.cache[ip] = cacheEntry{ + hostname: host, + expireTime: time.Now().Add(5 * time.Minute), + } + r.mutex.Unlock() + + return host +} + +// lookupIpAddr resturns hostname based on ip address +func (r *IPResolver) lookupIpAddr(ip string) (host string) { + res, err := net.LookupAddr(ip) + if err != nil || len(res) == 0 { + return ip + } + + host = res[0] + // Trim one trailing '.'. + if last := len(host) - 1; last >= 0 && host[last] == '.' { + host = host[:last] + } + return host +} diff --git a/operator/helper/ip_resolver_test.go b/operator/helper/ip_resolver_test.go new file mode 100644 index 000000000000..f4524b8d6528 --- /dev/null +++ b/operator/helper/ip_resolver_test.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "fmt" + "testing" + "time" + "unsafe" + + "github.com/stretchr/testify/require" +) + +func TestIPResolverCacheLookup(t *testing.T) { + resolver := NewIpResolver() + resolver.cache["127.0.0.1"] = cacheEntry{ + hostname: "definitely invalid hostname", + expireTime: time.Now().Add(time.Hour), + } + + require.Equal(t, "definitely invalid hostname", resolver.GetHostFromIp("127.0.0.1")) +} + +func TestIPResolverCacheInvalidation(t *testing.T) { + resolver := NewIpResolver() + + resolver.cache["127.0.0.1"] = cacheEntry{ + hostname: "definitely invalid hostname", + expireTime: time.Now().Add(-1 * time.Hour), + } + + resolver.Stop() + resolver.invalidateCache() + + hostname := resolver.lookupIpAddr("127.0.0.1") + require.Equal(t, hostname, resolver.GetHostFromIp("127.0.0.1")) +} + +func TestIPResolver100Hits(t *testing.T) { + resolver := NewIpResolver() + resolver.cache["127.0.0.1"] = cacheEntry{ + hostname: "definitely invalid hostname", + expireTime: time.Now().Add(time.Hour), + } + + for i := 0; i < 100; i++ { + require.Equal(t, "definitely invalid hostname", resolver.GetHostFromIp("127.0.0.1")) + } +} + +func TestIPResolverWithMultipleStops(t *testing.T) { + resolver := NewIpResolver() + + resolver.Stop() + resolver.Stop() +} + +func TestSizes(t *testing.T) { + fmt.Printf("string %v \n", unsafe.Sizeof("")) + fmt.Printf("cache entry %v \n", unsafe.Sizeof(cacheEntry{})) + fmt.Printf("time %v \n", unsafe.Sizeof(time.Now())) + fmt.Printf("time %v \n", unsafe.Sizeof(time.Now())) +}