diff --git a/docs/operators/udp_input.md b/docs/operators/udp_input.md index b241e097..f6ad7abb 100644 --- a/docs/operators/udp_input.md +++ b/docs/operators/udp_input.md @@ -4,27 +4,56 @@ The `udp_input` operator listens for logs from UDP packets. ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `udp_input` | A unique identifier for the operator | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | -| `listen_address` | required | A listen address of the form `:` | -| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry | -| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | -| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | -| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes | +| Field | Default | Description | +| --- | --- | --- | +| `id` | `udp_input` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `listen_address` | required | A listen address of the form `:` | +| `write_to` | $ | The body [field](/docs/types/field.md) written to when creating a new log entry | +| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | +| `resource` | {} | A map of `key: value` pairs to add to the entry's resource | +| `add_attributes` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` attributes | +| `multiline` | | A `multiline` configuration block. See below for details | +| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options | + +#### `multiline` configuration + +If set, the `multiline` configuration block instructs the `udp_input` operator to split log entries on a pattern other than newlines. + +**note** If `multiline` is not set at all, it wont't split log entries at all. Every UDP packet is going to be treated as log. +**note** `multiline` detection works per UDP packet due to protocol limitations. + +The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that +match either the beginning of a new log entry, or the end of a log entry. + +#### Supported encodings + +| Key | Description +| --- | --- | +| `nop` | No encoding validation. Treats the file as a stream of raw bytes | +| `utf-8` | UTF-8 encoding | +| `utf-16le` | UTF-16 encoding with little-endian byte order | +| `utf-16be` | UTF-16 encoding with little-endian byte order | +| `ascii` | ASCII encoding | +| `big5` | The Big5 Chinese character encoding | + +Other less common encodings are supported on a best-effort basis. +See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml) +for other encodings available. ### Example Configurations #### Simple Configuration: + ```yaml - type: udp_input listen_adress: "0.0.0.0:54526" ``` Send a log: + ```bash $ nc -u localhost 54525 < message1 @@ -33,6 +62,7 @@ heredoc> EOF ``` Generated entries: + ```json { "timestamp": "2020-04-30T12:10:17.656726-04:00", diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 077795ca..72e80047 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -32,9 +32,9 @@ import ( ) const ( - // minBufferSize is the initial size used for buffering + // minMaxLogSize is the minimal size which can be used for buffering // TCP input - minBufferSize = 64 * 1024 + minMaxLogSize = 64 * 1024 // DefaultMaxLogSize is the max buffer sized used // if MaxLogSize is not set @@ -79,8 +79,8 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato c.MaxLogSize = DefaultMaxLogSize } - if c.MaxLogSize < minBufferSize { - return nil, fmt.Errorf("invalid value for parameter 'max_log_size', must be equal to or greater than %d bytes", minBufferSize) + if c.MaxLogSize < minMaxLogSize { + return nil, fmt.Errorf("invalid value for parameter 'max_log_size', must be equal to or greater than %d bytes", minMaxLogSize) } if c.ListenAddress == "" { @@ -219,10 +219,9 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c defer t.wg.Done() defer cancel() - // Initial buffer size is 64k - buf := make([]byte, 0, 64*1024) + buf := make([]byte, 0, t.MaxLogSize) scanner := bufio.NewScanner(conn) - scanner.Buffer(buf, t.MaxLogSize*1024) + scanner.Buffer(buf, t.MaxLogSize) scanner.Split(t.splitFunc) diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index 6888a044..72c76d6b 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -15,6 +15,8 @@ package udp import ( + "bufio" + "bytes" "context" "fmt" "net" @@ -27,6 +29,11 @@ import ( "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" ) +const ( + // Maximum UDP packet size + MaxUDPSize = 64 * 1024 +) + func init() { operator.Register("udp_input", func() operator.Builder { return NewUDPInputConfig("") }) } @@ -35,6 +42,11 @@ func init() { func NewUDPInputConfig(operatorID string) *UDPInputConfig { return &UDPInputConfig{ InputConfig: helper.NewInputConfig(operatorID, "udp_input"), + Encoding: helper.NewEncodingConfig(), + Multiline: helper.MultilineConfig{ + LineStartPattern: "", + LineEndPattern: ".^", // Use never matching regex to not split data by default + }, } } @@ -42,8 +54,10 @@ func NewUDPInputConfig(operatorID string) *UDPInputConfig { type UDPInputConfig struct { helper.InputConfig `yaml:",inline"` - ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"` - AddAttributes bool `json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"` + ListenAddress string `mapstructure:"listen_address,omitempty" json:"listen_address,omitempty" yaml:"listen_address,omitempty"` + AddAttributes bool `mapstructure:"add_attributes,omitempty" json:"add_attributes,omitempty" yaml:"add_attributes,omitempty"` + Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` + Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` } // Build will build a udp input operator. @@ -62,11 +76,23 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, fmt.Errorf("failed to resolve listen_address: %s", err) } + encoding, err := c.Encoding.Build(context) + if err != nil { + return nil, err + } + + splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true) + if err != nil { + return nil, err + } + udpInput := &UDPInput{ InputOperator: inputOperator, address: address, - buffer: make([]byte, 8192), + buffer: make([]byte, MaxUDPSize), addAttributes: c.AddAttributes, + encoding: encoding, + splitFunc: splitFunc, } return []operator.Operator{udpInput}, nil } @@ -81,6 +107,9 @@ type UDPInput struct { connection net.PacketConn cancel context.CancelFunc wg sync.WaitGroup + + encoding helper.Encoding + splitFunc bufio.SplitFunc } // Start will start listening for messages on a socket. @@ -117,42 +146,59 @@ func (u *UDPInput) goHandleMessages(ctx context.Context) { break } - entry, err := u.NewEntry(message) - if err != nil { - u.Errorw("Failed to create entry", zap.Error(err)) - continue - } + buf := make([]byte, 0, MaxUDPSize) + scanner := bufio.NewScanner(bytes.NewReader(message)) + scanner.Buffer(buf, MaxUDPSize) - if u.addAttributes { - entry.AddAttribute("net.transport", "IP.UDP") - if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok { - entry.AddAttribute("net.host.ip", addr.IP.String()) - entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) + scanner.Split(u.splitFunc) + + for scanner.Scan() { + decoded, err := u.encoding.Decode(scanner.Bytes()) + if err != nil { + u.Errorw("Failed to decode data", zap.Error(err)) + continue } - if addr, ok := remoteAddr.(*net.UDPAddr); ok { - entry.AddAttribute("net.peer.ip", addr.IP.String()) - entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) + entry, err := u.NewEntry(decoded) + if err != nil { + u.Errorw("Failed to create entry", zap.Error(err)) + continue } - } - u.Write(ctx, entry) + if u.addAttributes { + entry.AddAttribute("net.transport", "IP.UDP") + if addr, ok := u.connection.LocalAddr().(*net.UDPAddr); ok { + entry.AddAttribute("net.host.ip", addr.IP.String()) + entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10)) + } + + if addr, ok := remoteAddr.(*net.UDPAddr); ok { + entry.AddAttribute("net.peer.ip", addr.IP.String()) + entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10)) + } + } + + u.Write(ctx, entry) + } + if err := scanner.Err(); err != nil { + u.Errorw("Scanner error", zap.Error(err)) + } } }() } // readMessage will read log messages from the connection. -func (u *UDPInput) readMessage() (string, net.Addr, error) { +func (u *UDPInput) readMessage() ([]byte, net.Addr, error) { n, addr, err := u.connection.ReadFrom(u.buffer) if err != nil { - return "", nil, err + return nil, nil, err } // Remove trailing characters and NULs for ; (n > 0) && (u.buffer[n-1] < 32); n-- { } - return string(u.buffer[:n]), addr, nil + return u.buffer[:n], addr, nil } // Stop will stop listening for udp messages. diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 7c786e1b..a6820f9f 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -77,6 +77,10 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { firstLoc := re.FindIndex(data) if firstLoc == nil { + // Flush if no more data is expected + if len(data) != 0 && atEOF && flushAtEOF { + return len(data), data, nil + } return 0, nil, nil // read more data and try again. } firstMatchStart := firstLoc[0]