Skip to content

Commit

Permalink
Add missing network connection attributes to tcp/udp (#134)
Browse files Browse the repository at this point in the history
* Add IP address resolver helper

Signed-off-by: Dominik Rosiek <[email protected]>

* tcp: use ip address resolver

Signed-off-by: Dominik Rosiek <[email protected]>

* udp: use ip address resolver

Signed-off-by: Dominik Rosiek <[email protected]>

* tcp: docs: update add_attributes description

Signed-off-by: Dominik Rosiek <[email protected]>

* udp: docs: update add_attributes description

Signed-off-by: Dominik Rosiek <[email protected]>

* Add cached IPResolver

Signed-off-by: Dominik Rosiek <[email protected]>

* Initialize IPResolver every time

Signed-off-by: Dominik Rosiek <[email protected]>

* docs: polish markdown for

Signed-off-by: Dominik Rosiek <[email protected]>

* udp: tcp: conditionally initialize resolver

Signed-off-by: Dominik Rosiek <[email protected]>

* Add tests for ip resolver

Signed-off-by: Dominik Rosiek <[email protected]>

* Get rid of race in ip_resolver test

Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
sumo-drosiek authored May 13, 2021
1 parent 0ff9df0 commit 8b85a69
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 6 deletions.
2 changes: 1 addition & 1 deletion docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op
| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] |
| `multiline` | | A `multiline` configuration block. See below for details |
| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options |

Expand Down
2 changes: 1 addition & 1 deletion docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The `udp_input` operator listens for logs from UDP packets.
| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes] |
| `multiline` | | A `multiline` configuration block. See below for details |
| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options |

Expand Down
16 changes: 15 additions & 1 deletion operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

var resolver *helper.IPResolver = nil
if c.AddAttributes {
resolver = helper.NewIpResolver()
}

tcpInput := &TCPInput{
InputOperator: inputOperator,
address: c.ListenAddress,
Expand All @@ -113,6 +118,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
backoff: backoff.Backoff{
Max: 3 * time.Second,
},
resolver: resolver,
}

if c.TLS != nil {
Expand Down Expand Up @@ -140,6 +146,7 @@ type TCPInput struct {

encoding helper.Encoding
splitFunc bufio.SplitFunc
resolver *helper.IPResolver
}

// Start will start listening for log entries over tcp.
Expand Down Expand Up @@ -249,13 +256,17 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c
if t.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
entry.AddAttribute("net.peer.ip", addr.IP.String())
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", t.resolver.GetHostFromIp(ip))
}

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", t.resolver.GetHostFromIp(ip))
}
}

Expand All @@ -276,5 +287,8 @@ func (t *TCPInput) Stop() error {
}

t.wg.Wait()
if t.resolver != nil {
t.resolver.Stop()
}
return nil
}
6 changes: 5 additions & 1 deletion operator/builtin/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,16 @@ func tcpInputAttributesTest(input []byte, expected []string) func(t *testing.T)
"net.transport": "IP.TCP",
}
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
expectedAttributes["net.host.ip"] = addr.IP.String()
expectedAttributes["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10)
expectedAttributes["net.host.name"] = tcpInput.resolver.GetHostFromIp(ip)
}
if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
expectedAttributes["net.peer.ip"] = addr.IP.String()
ip := addr.IP.String()
expectedAttributes["net.peer.ip"] = ip
expectedAttributes["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10)
expectedAttributes["net.peer.name"] = tcpInput.resolver.GetHostFromIp(ip)
}
require.Equal(t, expectedMessage, entry.Body)
require.Equal(t, expectedAttributes, entry.Attributes)
Expand Down
16 changes: 15 additions & 1 deletion operator/builtin/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,19 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

var resolver *helper.IPResolver = nil
if c.AddAttributes {
resolver = helper.NewIpResolver()
}

udpInput := &UDPInput{
InputOperator: inputOperator,
address: address,
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
resolver: resolver,
}
return []operator.Operator{udpInput}, nil
}
Expand All @@ -110,6 +116,7 @@ type UDPInput struct {

encoding helper.Encoding
splitFunc bufio.SplitFunc
resolver *helper.IPResolver
}

// Start will start listening for messages on a socket.
Expand Down Expand Up @@ -168,13 +175,17 @@ func (u *UDPInput) goHandleMessages(ctx context.Context) {
if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", u.resolver.GetHostFromIp(ip))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
entry.AddAttribute("net.peer.ip", addr.IP.String())
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", u.resolver.GetHostFromIp(ip))
}
}

Expand Down Expand Up @@ -206,5 +217,8 @@ func (u *UDPInput) Stop() error {
u.cancel()
u.connection.Close()
u.wg.Wait()
if u.resolver != nil {
u.resolver.Stop()
}
return nil
}
6 changes: 5 additions & 1 deletion operator/builtin/input/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,17 @@ func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T)
}
// LocalAddr for udpInput.connection is a server address
if addr, ok := udpInput.connection.LocalAddr().(*net.UDPAddr); ok {
ip := addr.IP.String()
expectedAttributes["net.host.ip"] = addr.IP.String()
expectedAttributes["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10)
expectedAttributes["net.host.name"] = udpInput.resolver.GetHostFromIp(ip)
}
// LocalAddr for conn is a client (peer) address
if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok {
expectedAttributes["net.peer.ip"] = addr.IP.String()
ip := addr.IP.String()
expectedAttributes["net.peer.ip"] = ip
expectedAttributes["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10)
expectedAttributes["net.peer.name"] = udpInput.resolver.GetHostFromIp(ip)
}
require.Equal(t, expectedBody, entry.Body)
require.Equal(t, expectedAttributes, entry.Attributes)
Expand Down
132 changes: 132 additions & 0 deletions operator/helper/ip_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helper

import (
"net"
"sync"
"time"
)

// cacheEntry keeps information about host and expiration time
type cacheEntry struct {
hostname string
expireTime time.Time
}

const (
defaultInvalidationInterval time.Duration = 5 * time.Minute
)

type IPResolver struct {
cache map[string]cacheEntry
mutex sync.RWMutex
done chan bool
stopped bool
invalidationInterval time.Duration
}

// Create new resolver
func NewIpResolver() *IPResolver {
r := &IPResolver{
cache: make(map[string]cacheEntry),
stopped: false,
done: make(chan bool),
invalidationInterval: defaultInvalidationInterval,
}
r.start()
return r
}

// Stop cache invalidation
func (r *IPResolver) Stop() {
r.mutex.Lock()
if r.stopped {
r.mutex.Unlock()
return
}

r.stopped = true
r.mutex.Unlock()
r.done <- true
}

// start runs cache invalidation every 5 minutes
func (r *IPResolver) start() {
ticker := time.NewTicker(r.invalidationInterval)
go func() {
for {
select {
case <-r.done:
ticker.Stop()
return
case <-ticker.C:
r.mutex.Lock()
r.invalidateCache()
r.mutex.Unlock()
}
}
}()
}

// invalidateCache removes not longer valid entries from cache
func (r *IPResolver) invalidateCache() {
now := time.Now()
for key, entry := range r.cache {
if entry.expireTime.Before(now) {
delete(r.cache, key)
}
}
}

// GetHostFromIp returns hostname for given ip
// It is taken from cache if exists,
// otherwise lookup is performed and result is put into cache
func (r *IPResolver) GetHostFromIp(ip string) (host string) {
r.mutex.RLock()
entry, ok := r.cache[ip]
if ok {
host = entry.hostname
defer r.mutex.RUnlock()
return host
}
r.mutex.RUnlock()

host = r.lookupIpAddr(ip)

r.mutex.Lock()
r.cache[ip] = cacheEntry{
hostname: host,
expireTime: time.Now().Add(5 * time.Minute),
}
r.mutex.Unlock()

return host
}

// lookupIpAddr resturns hostname based on ip address
func (r *IPResolver) lookupIpAddr(ip string) (host string) {
res, err := net.LookupAddr(ip)
if err != nil || len(res) == 0 {
return ip
}

host = res[0]
// Trim one trailing '.'.
if last := len(host) - 1; last >= 0 && host[last] == '.' {
host = host[:last]
}
return host
}
75 changes: 75 additions & 0 deletions operator/helper/ip_resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helper

import (
"fmt"
"testing"
"time"
"unsafe"

"github.com/stretchr/testify/require"
)

func TestIPResolverCacheLookup(t *testing.T) {
resolver := NewIpResolver()
resolver.cache["127.0.0.1"] = cacheEntry{
hostname: "definitely invalid hostname",
expireTime: time.Now().Add(time.Hour),
}

require.Equal(t, "definitely invalid hostname", resolver.GetHostFromIp("127.0.0.1"))
}

func TestIPResolverCacheInvalidation(t *testing.T) {
resolver := NewIpResolver()

resolver.cache["127.0.0.1"] = cacheEntry{
hostname: "definitely invalid hostname",
expireTime: time.Now().Add(-1 * time.Hour),
}

resolver.Stop()
resolver.invalidateCache()

hostname := resolver.lookupIpAddr("127.0.0.1")
require.Equal(t, hostname, resolver.GetHostFromIp("127.0.0.1"))
}

func TestIPResolver100Hits(t *testing.T) {
resolver := NewIpResolver()
resolver.cache["127.0.0.1"] = cacheEntry{
hostname: "definitely invalid hostname",
expireTime: time.Now().Add(time.Hour),
}

for i := 0; i < 100; i++ {
require.Equal(t, "definitely invalid hostname", resolver.GetHostFromIp("127.0.0.1"))
}
}

func TestIPResolverWithMultipleStops(t *testing.T) {
resolver := NewIpResolver()

resolver.Stop()
resolver.Stop()
}

func TestSizes(t *testing.T) {
fmt.Printf("string %v \n", unsafe.Sizeof(""))
fmt.Printf("cache entry %v \n", unsafe.Sizeof(cacheEntry{}))
fmt.Printf("time %v \n", unsafe.Sizeof(time.Now()))
fmt.Printf("time %v \n", unsafe.Sizeof(time.Now()))
}

0 comments on commit 8b85a69

Please sign in to comment.