Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Add encoding, max_log_size and multiline to udp #127

Merged
merged 9 commits into from
May 12, 2021
Merged
48 changes: 39 additions & 9 deletions docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,56 @@ The `udp_input` operator listens for logs from UDP packets.

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `udp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `udp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes |
| `multiline` | | A `multiline` configuration block. See below for details |
| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options |

#### `multiline` configuration

If set, the `multiline` configuration block instructs the `udp_input` operator to split log entries on a pattern other than newlines.

**note** If `multiline` is not set at all, it wont't split log entries at all. Every UDP packet is going to be treated as log.
**note** `multiline` detection works per UDP packet due to protocol limitations.

The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that
match either the beginning of a new log entry, or the end of a log entry.

#### Supported encodings

| Key | Description
| --- | --- |
| `nop` | No encoding validation. Treats the file as a stream of raw bytes |
| `utf-8` | UTF-8 encoding |
| `utf-16le` | UTF-16 encoding with little-endian byte order |
| `utf-16be` | UTF-16 encoding with little-endian byte order |
| `ascii` | ASCII encoding |
| `big5` | The Big5 Chinese character encoding |

Other less common encodings are supported on a best-effort basis.
See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml)
for other encodings available.

### Example Configurations

#### Simple

Configuration:

```yaml
- type: udp_input
listen_adress: "0.0.0.0:54526"
```

Send a log:

```bash
$ nc -u localhost 54525 <<EOF
heredoc> message1
Expand All @@ -33,6 +62,7 @@ heredoc> EOF
```

Generated entries:

```json
{
"timestamp": "2020-04-30T12:10:17.656726-04:00",
Expand Down
13 changes: 6 additions & 7 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
)

const (
// minBufferSize is the initial size used for buffering
// minMaxLogSize is the minimal size which can be used for buffering
// TCP input
minBufferSize = 64 * 1024
minMaxLogSize = 64 * 1024

// DefaultMaxLogSize is the max buffer sized used
// if MaxLogSize is not set
Expand Down Expand Up @@ -79,8 +79,8 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
c.MaxLogSize = DefaultMaxLogSize
}

if c.MaxLogSize < minBufferSize {
return nil, fmt.Errorf("invalid value for parameter 'max_log_size', must be equal to or greater than %d bytes", minBufferSize)
if c.MaxLogSize < minMaxLogSize {
return nil, fmt.Errorf("invalid value for parameter 'max_log_size', must be equal to or greater than %d bytes", minMaxLogSize)
}

if c.ListenAddress == "" {
Expand Down Expand Up @@ -219,10 +219,9 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c
defer t.wg.Done()
defer cancel()

// Initial buffer size is 64k
buf := make([]byte, 0, 64*1024)
buf := make([]byte, 0, t.MaxLogSize)
scanner := bufio.NewScanner(conn)
scanner.Buffer(buf, t.MaxLogSize*1024)
scanner.Buffer(buf, t.MaxLogSize)

scanner.Split(t.splitFunc)

Expand Down
89 changes: 68 additions & 21 deletions operator/builtin/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package udp

import (
"bufio"
"bytes"
"context"
"fmt"
"net"
Expand All @@ -27,6 +29,11 @@ import (
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

const (
// Maximum UDP packet size
MaxUDPSize = 64 * 1024
)

func init() {
operator.Register("udp_input", func() operator.Builder { return NewUDPInputConfig("") })
}
Expand All @@ -35,15 +42,23 @@ func init() {
func NewUDPInputConfig(operatorID string) *UDPInputConfig {
return &UDPInputConfig{
InputConfig: helper.NewInputConfig(operatorID, "udp_input"),
Encoding: helper.NewEncodingConfig(),
Multiline: helper.MultilineConfig{
LineStartPattern: "",
LineEndPattern: ".^", // Use never matching regex to not split data by default
},
}
}

// UDPInputConfig is the configuration of a udp input operator.
type UDPInputConfig struct {
helper.InputConfig `yaml:",inline"`

ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
AddAttributes bool `json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"`
ListenAddress string `mapstructure:"listen_address,omitempty" json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty" json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
sumo-drosiek marked this conversation as resolved.
Show resolved Hide resolved
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"`
}

// Build will build a udp input operator.
Expand All @@ -62,11 +77,23 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, fmt.Errorf("failed to resolve listen_address: %s", err)
}

encoding, err := c.Encoding.Build(context)
if err != nil {
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true)
if err != nil {
return nil, err
}

udpInput := &UDPInput{
InputOperator: inputOperator,
address: address,
buffer: make([]byte, 8192),
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
}
return []operator.Operator{udpInput}, nil
}
Expand All @@ -81,6 +108,9 @@ type UDPInput struct {
connection net.PacketConn
cancel context.CancelFunc
wg sync.WaitGroup

encoding helper.Encoding
splitFunc bufio.SplitFunc
}

// Start will start listening for messages on a socket.
Expand Down Expand Up @@ -117,42 +147,59 @@ func (u *UDPInput) goHandleMessages(ctx context.Context) {
break
}

entry, err := u.NewEntry(message)
if err != nil {
u.Errorw("Failed to create entry", zap.Error(err))
continue
}
buf := make([]byte, 0, MaxUDPSize)
scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)

if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
scanner.Split(u.splitFunc)

for scanner.Scan() {
decoded, err := u.encoding.Decode(scanner.Bytes())
if err != nil {
u.Errorw("Failed to decode data", zap.Error(err))
continue
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
entry.AddAttribute("net.peer.ip", addr.IP.String())
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry, err := u.NewEntry(decoded)
if err != nil {
u.Errorw("Failed to create entry", zap.Error(err))
continue
}
}

u.Write(ctx, entry)
if u.addAttributes {
entry.AddAttribute("net.transport", "IP.UDP")
if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok {
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
}

if addr, ok := remoteAddr.(*net.UDPAddr); ok {
entry.AddAttribute("net.peer.ip", addr.IP.String())
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
}
}

u.Write(ctx, entry)
}
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
}
}
}()
}

// readMessage will read log messages from the connection.
func (u *UDPInput) readMessage() (string, net.Addr, error) {
func (u *UDPInput) readMessage() ([]byte, net.Addr, error) {
n, addr, err := u.connection.ReadFrom(u.buffer)
if err != nil {
return "", nil, err
return nil, nil, err
}

// Remove trailing characters and NULs
for ; (n > 0) && (u.buffer[n-1] < 32); n-- {
}

return string(u.buffer[:n]), addr, nil
return u.buffer[:n], addr, nil
}

// Stop will stop listening for udp messages.
Expand Down
4 changes: 4 additions & 0 deletions operator/helper/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
firstLoc := re.FindIndex(data)
if firstLoc == nil {
// Flush if no more data is expected
if len(data) != 0 && atEOF && flushAtEOF {
return len(data), data, nil
}
return 0, nil, nil // read more data and try again.
}
firstMatchStart := firstLoc[0]
Expand Down