Skip to content

Commit

Permalink
Add basic net attributes to tcp/udp input (#108)
Browse files Browse the repository at this point in the history
* tcp: test: add optionally net attributes to tcp operator entries

Signed-off-by: Dominik Rosiek <[email protected]>

* udp: test: add optionally net attributes to udp operator entries

Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
sumo-drosiek authored Apr 20, 2021
1 parent b506aad commit 5463069
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 24 deletions.
22 changes: 12 additions & 10 deletions docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `tcp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `max_buffer_size` | `1024kib` | Maximum size of buffer that may be allocated while reading TCP input |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) |
| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `tcp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `max_buffer_size` | `1024kib` | Maximum size of buffer that may be allocated while reading TCP input |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) |
| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes |


#### TLS Configuration

Expand Down
17 changes: 9 additions & 8 deletions docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ The `udp_input` operator listens for logs from UDP packets.

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `udp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `udp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes |

### Example Configurations

Expand Down
18 changes: 18 additions & 0 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/tls"
"fmt"
"net"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -58,6 +59,7 @@ type TCPInputConfig struct {
MaxBufferSize helper.ByteSize `json:"max_buffer_size,omitempty" yaml:"max_buffer_size,omitempty"`
ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
TLS *helper.TLSServerConfig `json:"tls,omitempty" yaml:"tls,omitempty"`
AddAttributes bool `json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"`
}

// Build will build a tcp input operator.
Expand Down Expand Up @@ -90,6 +92,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
InputOperator: inputOperator,
address: c.ListenAddress,
maxBufferSize: int(c.MaxBufferSize),
addAttributes: c.AddAttributes,
}

if c.TLS != nil {
Expand All @@ -107,6 +110,7 @@ type TCPInput struct {
helper.InputOperator
address string
maxBufferSize int
addAttributes bool

listener net.Listener
cancel context.CancelFunc
Expand Down Expand Up @@ -206,6 +210,20 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c
t.Errorw("Failed to create entry", zap.Error(err))
continue
}

if t.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
entry.AddAttribute("net.peer.ip", addr.IP.String())
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
}

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
}
}

t.Write(ctx, entry)
}
if err := scanner.Err(); err != nil {
Expand Down
66 changes: 66 additions & 0 deletions operator/builtin/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"crypto/tls"
"net"
"os"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -130,6 +131,66 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) {
}
}

func tcpInputAttributesTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = ":0"
cfg.AddAttributes = true

ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
op := ops[0]

mockOutput := testutil.Operator{}
tcpInput := op.(*TCPInput)
tcpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput}

entryChan := make(chan *entry.Entry, 1)
mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
entryChan <- args.Get(1).(*entry.Entry)
}).Return(nil)

err = tcpInput.Start(testutil.NewMockPersister("test"))
require.NoError(t, err)
defer tcpInput.Stop()

conn, err := net.Dial("tcp", tcpInput.listener.Addr().String())
require.NoError(t, err)
defer conn.Close()

_, err = conn.Write(input)
require.NoError(t, err)

for _, expectedMessage := range expected {
select {
case entry := <-entryChan:
expectedAttributes := map[string]string{
"net.transport": "IP.TCP",
}
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
expectedAttributes["net.host.ip"] = addr.IP.String()
expectedAttributes["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10)
}
if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
expectedAttributes["net.peer.ip"] = addr.IP.String()
expectedAttributes["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10)
}
require.Equal(t, expectedMessage, entry.Body)
require.Equal(t, expectedAttributes, entry.Attributes)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for message to be written")
}
}

select {
case entry := <-entryChan:
require.FailNow(t, "Unexpected entry: %s", entry)
case <-time.After(100 * time.Millisecond):
return
}
}
}

func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {
f, err := os.Create("test.crt")
Expand Down Expand Up @@ -282,6 +343,11 @@ func TestTcpInput(t *testing.T) {
t.Run("CarriageReturn", tcpInputTest([]byte("message\r\n"), []string{"message"}))
}

func TestTcpInputAattributes(t *testing.T) {
t.Run("Simple", tcpInputAttributesTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tcpInputAttributesTest([]byte("message\r\n"), []string{"message"}))
}

func TestTLSTcpInput(t *testing.T) {
t.Run("Simple", tlsTCPInputTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tlsTCPInputTest([]byte("message\r\n"), []string{"message"}))
Expand Down
29 changes: 23 additions & 6 deletions operator/builtin/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net"
"strconv"
"sync"

"go.uber.org/zap"
Expand All @@ -42,6 +43,7 @@ type UDPInputConfig struct {
helper.InputConfig `yaml:",inline"`

ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
AddAttributes bool `json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"`
}

// Build will build a udp input operator.
Expand All @@ -64,6 +66,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
InputOperator: inputOperator,
address: address,
buffer: make([]byte, 8192),
addAttributes: c.AddAttributes,
}
return []operator.Operator{udpInput}, nil
}
Expand All @@ -72,7 +75,8 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
type UDPInput struct {
buffer []byte
helper.InputOperator
address *net.UDPAddr
address *net.UDPAddr
addAttributes bool

connection net.PacketConn
cancel context.CancelFunc
Expand Down Expand Up @@ -102,7 +106,7 @@ func (u *UDPInput) goHandleMessages(ctx context.Context) {
defer u.wg.Done()

for {
message, err := u.readMessage()
message, remoteAddr, err := u.readMessage()
if err != nil {
select {
case <-ctx.Done():
Expand All @@ -119,23 +123,36 @@ func (u *UDPInput) goHandleMessages(ctx context.Context) {
continue
}

if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
entry.AddAttribute("net.peer.ip", addr.IP.String())
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
}
}

u.Write(ctx, entry)
}
}()
}

// readMessage will read log messages from the connection.
func (u *UDPInput) readMessage() (string, error) {
n, _, err := u.connection.ReadFrom(u.buffer)
func (u *UDPInput) readMessage() (string, net.Addr, error) {
n, addr, err := u.connection.ReadFrom(u.buffer)
if err != nil {
return "", err
return "", nil, err
}

// Remove trailing characters and NULs
for ; (n > 0) && (u.buffer[n-1] < 32); n-- {
}

return string(u.buffer[:n]), nil
return string(u.buffer[:n]), addr, nil
}

// Stop will stop listening for udp messages.
Expand Down
72 changes: 72 additions & 0 deletions operator/builtin/input/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package udp

import (
"net"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -76,13 +77,84 @@ func udpInputTest(input []byte, expected []string) func(t *testing.T) {
}
}

func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewUDPInputConfig("test_input")
cfg.ListenAddress = ":0"
cfg.AddAttributes = true

ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
op := ops[0]

mockOutput := testutil.Operator{}
udpInput, ok := op.(*UDPInput)
require.True(t, ok)

udpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput}

entryChan := make(chan *entry.Entry, 1)
mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
entryChan <- args.Get(1).(*entry.Entry)
}).Return(nil)

err = udpInput.Start(testutil.NewMockPersister("test"))
require.NoError(t, err)
defer udpInput.Stop()

conn, err := net.Dial("udp", udpInput.connection.LocalAddr().String())
require.NoError(t, err)
defer conn.Close()

_, err = conn.Write(input)
require.NoError(t, err)

for _, expectedBody := range expected {
select {
case entry := <-entryChan:
expectedAttributes := map[string]string{
"net.transport": "IP.UDP",
}
// LocalAddr for udpInput.connection is a server address
if addr, ok := udpInput.connection.LocalAddr().(*net.UDPAddr); ok {
expectedAttributes["net.host.ip"] = addr.IP.String()
expectedAttributes["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10)
}
// LocalAddr for conn is a client (peer) address
if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok {
expectedAttributes["net.peer.ip"] = addr.IP.String()
expectedAttributes["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10)
}
require.Equal(t, expectedBody, entry.Body)
require.Equal(t, expectedAttributes, entry.Attributes)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for message to be written")
}
}

select {
case entry := <-entryChan:
require.FailNow(t, "Unexpected entry: %s", entry)
case <-time.After(100 * time.Millisecond):
return
}
}
}

func TestUDPInput(t *testing.T) {
t.Run("Simple", udpInputTest([]byte("message1"), []string{"message1"}))
t.Run("TrailingNewlines", udpInputTest([]byte("message1\n"), []string{"message1"}))
t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}))
t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}))
}

func TestUDPInputAttributes(t *testing.T) {
t.Run("Simple", udpInputAttributesTest([]byte("message1"), []string{"message1"}))
t.Run("TrailingNewlines", udpInputAttributesTest([]byte("message1\n"), []string{"message1"}))
t.Run("TrailingCRNewlines", udpInputAttributesTest([]byte("message1\r\n"), []string{"message1"}))
t.Run("NewlineInMessage", udpInputAttributesTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}))
}

func BenchmarkUdpInput(b *testing.B) {
cfg := NewUDPInputConfig("test_id")
cfg.ListenAddress = ":0"
Expand Down

0 comments on commit 5463069

Please sign in to comment.