diff --git a/docs/operators/tcp_input.md b/docs/operators/tcp_input.md index 69d88add1fff..493fb61af303 100644 --- a/docs/operators/tcp_input.md +++ b/docs/operators/tcp_input.md @@ -4,16 +4,18 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `tcp_input` | A unique identifier for the operator | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | -| `max_buffer_size` | `1024kib` | Maximum size of buffer that may be allocated while reading TCP input | -| `listen_address` | required | A listen address of the form `:` | -| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) | -| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry | -| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | -| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | +| Field | Default | Description | +| --- | --- | --- | +| `id` | `tcp_input` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `max_buffer_size` | `1024kib` | Maximum size of buffer that may be allocated while reading TCP input | +| `listen_address` | required | A listen address of the form `:` | +| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section) | +| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry | +| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | +| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | +| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes | + #### TLS Configuration diff --git a/docs/operators/udp_input.md b/docs/operators/udp_input.md index c5ba73438108..b241e097f1c2 100644 --- a/docs/operators/udp_input.md +++ b/docs/operators/udp_input.md @@ -4,14 +4,15 @@ The `udp_input` operator listens for logs from UDP packets. ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `udp_input` | A unique identifier for the operator | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | -| `listen_address` | required | A listen address of the form `:` | -| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry | -| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | -| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | +| Field | Default | Description | +| --- | --- | --- | +| `id` | `udp_input` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `listen_address` | required | A listen address of the form `:` | +| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry | +| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | +| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | +| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes | ### Example Configurations diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 8b1fa3892f64..4b4b24371459 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -21,6 +21,7 @@ import ( "crypto/tls" "fmt" "net" + "strconv" "sync" "time" @@ -58,6 +59,7 @@ type TCPInputConfig struct { MaxBufferSize helper.ByteSize `json:"max_buffer_size,omitempty" yaml:"max_buffer_size,omitempty"` ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"` TLS *helper.TLSServerConfig `json:"tls,omitempty" yaml:"tls,omitempty"` + AddAttributes bool `json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"` } // Build will build a tcp input operator. @@ -90,6 +92,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato InputOperator: inputOperator, address: c.ListenAddress, maxBufferSize: int(c.MaxBufferSize), + addAttributes: c.AddAttributes, } if c.TLS != nil { @@ -107,6 +110,7 @@ type TCPInput struct { helper.InputOperator address string maxBufferSize int + addAttributes bool listener net.Listener cancel context.CancelFunc @@ -206,6 +210,20 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c t.Errorw("Failed to create entry", zap.Error(err)) continue } + + if t.addAttributes { + entry.AddAttribute("net.transport", "IP.TCP") + if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { + entry.AddAttribute("net.peer.ip", addr.IP.String()) + entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) + } + + if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok { + entry.AddAttribute("net.host.ip", addr.IP.String()) + entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) + } + } + t.Write(ctx, entry) } if err := scanner.Err(); err != nil { diff --git a/operator/builtin/input/tcp/tcp_test.go b/operator/builtin/input/tcp/tcp_test.go index 42b00d9e2c4f..0d0c2730c5ad 100644 --- a/operator/builtin/input/tcp/tcp_test.go +++ b/operator/builtin/input/tcp/tcp_test.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "net" "os" + "strconv" "testing" "time" @@ -130,6 +131,66 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) { } } +func tcpInputAttributesTest(input []byte, expected []string) func(t *testing.T) { + return func(t *testing.T) { + cfg := NewTCPInputConfig("test_id") + cfg.ListenAddress = ":0" + cfg.AddAttributes = true + + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + op := ops[0] + + mockOutput := testutil.Operator{} + tcpInput := op.(*TCPInput) + tcpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput} + + entryChan := make(chan *entry.Entry, 1) + mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + entryChan <- args.Get(1).(*entry.Entry) + }).Return(nil) + + err = tcpInput.Start(testutil.NewMockPersister("test")) + require.NoError(t, err) + defer tcpInput.Stop() + + conn, err := net.Dial("tcp", tcpInput.listener.Addr().String()) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.Write(input) + require.NoError(t, err) + + for _, expectedMessage := range expected { + select { + case entry := <-entryChan: + expectedAttributes := map[string]string{ + "net.transport": "IP.TCP", + } + if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { + expectedAttributes["net.host.ip"] = addr.IP.String() + expectedAttributes["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10) + } + if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok { + expectedAttributes["net.peer.ip"] = addr.IP.String() + expectedAttributes["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10) + } + require.Equal(t, expectedMessage, entry.Body) + require.Equal(t, expectedAttributes, entry.Attributes) + case <-time.After(time.Second): + require.FailNow(t, "Timed out waiting for message to be written") + } + } + + select { + case entry := <-entryChan: + require.FailNow(t, "Unexpected entry: %s", entry) + case <-time.After(100 * time.Millisecond): + return + } + } +} + func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) { return func(t *testing.T) { f, err := os.Create("test.crt") @@ -282,6 +343,11 @@ func TestTcpInput(t *testing.T) { t.Run("CarriageReturn", tcpInputTest([]byte("message\r\n"), []string{"message"})) } +func TestTcpInputAattributes(t *testing.T) { + t.Run("Simple", tcpInputAttributesTest([]byte("message\n"), []string{"message"})) + t.Run("CarriageReturn", tcpInputAttributesTest([]byte("message\r\n"), []string{"message"})) +} + func TestTLSTcpInput(t *testing.T) { t.Run("Simple", tlsTCPInputTest([]byte("message\n"), []string{"message"})) t.Run("CarriageReturn", tlsTCPInputTest([]byte("message\r\n"), []string{"message"})) diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index 430c60e53a3e..6888a044a84a 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "net" + "strconv" "sync" "go.uber.org/zap" @@ -42,6 +43,7 @@ type UDPInputConfig struct { helper.InputConfig `yaml:",inline"` ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"` + AddAttributes bool `json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"` } // Build will build a udp input operator. @@ -64,6 +66,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato InputOperator: inputOperator, address: address, buffer: make([]byte, 8192), + addAttributes: c.AddAttributes, } return []operator.Operator{udpInput}, nil } @@ -72,7 +75,8 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato type UDPInput struct { buffer []byte helper.InputOperator - address *net.UDPAddr + address *net.UDPAddr + addAttributes bool connection net.PacketConn cancel context.CancelFunc @@ -102,7 +106,7 @@ func (u *UDPInput) goHandleMessages(ctx context.Context) { defer u.wg.Done() for { - message, err := u.readMessage() + message, remoteAddr, err := u.readMessage() if err != nil { select { case <-ctx.Done(): @@ -119,23 +123,36 @@ func (u *UDPInput) goHandleMessages(ctx context.Context) { continue } + if u.addAttributes { + entry.AddAttribute("net.transport", "IP.UDP") + if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok { + entry.AddAttribute("net.host.ip", addr.IP.String()) + entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) + } + + if addr, ok := remoteAddr.(*net.UDPAddr); ok { + entry.AddAttribute("net.peer.ip", addr.IP.String()) + entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) + } + } + u.Write(ctx, entry) } }() } // readMessage will read log messages from the connection. -func (u *UDPInput) readMessage() (string, error) { - n, _, err := u.connection.ReadFrom(u.buffer) +func (u *UDPInput) readMessage() (string, net.Addr, error) { + n, addr, err := u.connection.ReadFrom(u.buffer) if err != nil { - return "", err + return "", nil, err } // Remove trailing characters and NULs for ; (n > 0) && (u.buffer[n-1] < 32); n-- { } - return string(u.buffer[:n]), nil + return string(u.buffer[:n]), addr, nil } // Stop will stop listening for udp messages. diff --git a/operator/builtin/input/udp/udp_test.go b/operator/builtin/input/udp/udp_test.go index 89aab4b2dd95..8e854b8f98f1 100644 --- a/operator/builtin/input/udp/udp_test.go +++ b/operator/builtin/input/udp/udp_test.go @@ -16,6 +16,7 @@ package udp import ( "net" + "strconv" "testing" "time" @@ -76,6 +77,70 @@ func udpInputTest(input []byte, expected []string) func(t *testing.T) { } } +func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T) { + return func(t *testing.T) { + cfg := NewUDPInputConfig("test_input") + cfg.ListenAddress = ":0" + cfg.AddAttributes = true + + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + op := ops[0] + + mockOutput := testutil.Operator{} + udpInput, ok := op.(*UDPInput) + require.True(t, ok) + + udpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput} + + entryChan := make(chan *entry.Entry, 1) + mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + entryChan <- args.Get(1).(*entry.Entry) + }).Return(nil) + + err = udpInput.Start(testutil.NewMockPersister("test")) + require.NoError(t, err) + defer udpInput.Stop() + + conn, err := net.Dial("udp", udpInput.connection.LocalAddr().String()) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.Write(input) + require.NoError(t, err) + + for _, expectedBody := range expected { + select { + case entry := <-entryChan: + expectedAttributes := map[string]string{ + "net.transport": "IP.UDP", + } + // LocalAddr for udpInput.connection is a server address + if addr, ok := udpInput.connection.LocalAddr().(*net.UDPAddr); ok { + expectedAttributes["net.host.ip"] = addr.IP.String() + expectedAttributes["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10) + } + // LocalAddr for conn is a client (peer) address + if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { + expectedAttributes["net.peer.ip"] = addr.IP.String() + expectedAttributes["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10) + } + require.Equal(t, expectedBody, entry.Body) + require.Equal(t, expectedAttributes, entry.Attributes) + case <-time.After(time.Second): + require.FailNow(t, "Timed out waiting for message to be written") + } + } + + select { + case entry := <-entryChan: + require.FailNow(t, "Unexpected entry: %s", entry) + case <-time.After(100 * time.Millisecond): + return + } + } +} + func TestUDPInput(t *testing.T) { t.Run("Simple", udpInputTest([]byte("message1"), []string{"message1"})) t.Run("TrailingNewlines", udpInputTest([]byte("message1\n"), []string{"message1"})) @@ -83,6 +148,13 @@ func TestUDPInput(t *testing.T) { t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"})) } +func TestUDPInputAttributes(t *testing.T) { + t.Run("Simple", udpInputAttributesTest([]byte("message1"), []string{"message1"})) + t.Run("TrailingNewlines", udpInputAttributesTest([]byte("message1\n"), []string{"message1"})) + t.Run("TrailingCRNewlines", udpInputAttributesTest([]byte("message1\r\n"), []string{"message1"})) + t.Run("NewlineInMessage", udpInputAttributesTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"})) +} + func BenchmarkUdpInput(b *testing.B) { cfg := NewUDPInputConfig("test_id") cfg.ListenAddress = ":0"