Skip to content

Commit

Permalink
[receiver/filelog] Support preserving whitespace for log entries (#18266
Browse files Browse the repository at this point in the history
)

* Allow to preserve whitespace
  • Loading branch information
newly12 authored Feb 28, 2023
1 parent 47df00d commit 11f8ed1
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 98 deletions.
11 changes: 11 additions & 0 deletions .chloggen/stanza-preserve-whitespace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support preserving whitespace for log entries

# One or more tracking issues related to the change
issues: [18242]
2 changes: 2 additions & 0 deletions pkg/stanza/docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ The `file_input` operator reads logs from files. It will place the lines read in
| `include_file_path` | `false` | Whether to add the file path as the attribute `log.file.path`. |
| `include_file_name_resolved` | `false` | Whether to add the file name after symlinks resolution as the attribute `log.file.name_resolved`. |
| `include_file_path_resolved` | `false` | Whether to add the file path after symlinks resolution as the attribute `log.file.path_resolved`. |
| `preserve_leading_whitespaces` | `false` | Whether to preserve leading whitespaces. |
| `preserve_trailing_whitespaces` | `false` | Whether to preserve trailing whitespaces. |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end`. This setting will be ignored if previously read file offsets are retrieved from a persistence mechanism. |
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |.
Expand Down
26 changes: 14 additions & 12 deletions pkg/stanza/docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `tcp_input` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory. |
| `listen_address` | required | A listen address of the form `<ip>:<port>`. |
| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section). |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
| `multiline` | | A `multiline` configuration block. See below for details. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `tcp_input` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory. |
| `listen_address` | required | A listen address of the form `<ip>:<port>`. |
| `tls` | nil | An optional `TLS` configuration (see the TLS configuration section). |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
| `multiline` | | A `multiline` configuration block. See below for details. |
| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. |
| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |

#### TLS Configuration

Expand Down
22 changes: 12 additions & 10 deletions pkg/stanza/docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ The `udp_input` operator listens for logs from UDP packets.

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `udp_input` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `listen_address` | required | A listen address of the form `<ip>:<port>`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
| `multiline` | | A `multiline` configuration block. See below for details. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `udp_input` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `listen_address` | required | A listen address of the form `<ip>:<port>`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/span-general.md#general-network-connection-attributes]. |
| `multiline` | | A `multiline` configuration block. See below for details. |
| `preserve_leading_whitespaces` | false | Whether to preserve leading whitespaces. |
| `preserve_trailing_whitespaces` | false | Whether to preserve trailing whitespaces. |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options. |

#### `multiline` configuration

Expand Down
3 changes: 1 addition & 2 deletions pkg/stanza/fileconsumer/splitter_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func newMultilineSplitterFactory(splitter helper.SplitterConfig) *multilineSplit
return &multilineSplitterFactory{
SplitterConfig: splitter,
}

}

// Build builds Multiline Splitter struct
Expand All @@ -44,7 +43,7 @@ func (factory *multilineSplitterFactory) Build(maxLogSize int) (bufio.SplitFunc,
return nil, err
}
flusher := factory.Flusher.Build()
splitter, err := factory.Multiline.Build(enc.Encoding, false, flusher, maxLogSize)
splitter, err := factory.Multiline.Build(enc.Encoding, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/operator/helper/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
if f.ShouldFlush() {
// Inform flusher that we just flushed
f.Flushed()
token = trimWhitespaces(data)
token = trimWhitespacesFunc(data)
advance = len(data)
return
}
Expand Down
71 changes: 51 additions & 20 deletions pkg/stanza/operator/helper/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ type MultilineConfig struct {
}

// Build will build a Multiline operator.
func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF bool, force *Flusher, maxLogSize int) (bufio.SplitFunc, error) {
return c.getSplitFunc(enc, flushAtEOF, force, maxLogSize)
func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool, force *Flusher, maxLogSize int) (bufio.SplitFunc, error) {
return c.getSplitFunc(enc, flushAtEOF, force, maxLogSize, preserveLeadingWhitespaces, preserveTrailingWhitespaces)
}

// getSplitFunc returns split function for bufio.Scanner basing on configured pattern
func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, force *Flusher, maxLogSize int) (bufio.SplitFunc, error) {
func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, force *Flusher, maxLogSize int, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) (bufio.SplitFunc, error) {
endPattern := c.LineEndPattern
startPattern := c.LineStartPattern

Expand All @@ -66,7 +66,7 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, fo
case enc == encoding.Nop:
return SplitNone(maxLogSize), nil
case endPattern == "" && startPattern == "":
splitFunc, err = NewNewlineSplitFunc(enc, flushAtEOF)
splitFunc, err = NewNewlineSplitFunc(enc, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces))

if err != nil {
return nil, err
Expand All @@ -76,13 +76,13 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, fo
if err != nil {
return nil, fmt.Errorf("compile line end regex: %w", err)
}
splitFunc = NewLineEndSplitFunc(re, flushAtEOF)
splitFunc = NewLineEndSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces))
case startPattern != "":
re, err := regexp.Compile("(?m)" + c.LineStartPattern)
if err != nil {
return nil, fmt.Errorf("compile line start regex: %w", err)
}
splitFunc = NewLineStartSplitFunc(re, flushAtEOF)
splitFunc = NewLineStartSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces))
default:
return nil, fmt.Errorf("unreachable")
}
Expand All @@ -96,13 +96,13 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, fo

// NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that start with a match to the regex pattern provided
func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
firstLoc := re.FindIndex(data)
if firstLoc == nil {
// Flush if no more data is expected
if len(data) != 0 && atEOF && flushAtEOF {
token = trimWhitespaces(data)
token = trimFunc(data)
advance = len(data)
return
}
Expand All @@ -114,7 +114,7 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
if firstMatchStart != 0 {
// the beginning of the file does not match the start pattern, so return a token up to the first match so we don't lose data
advance = firstMatchStart
token = trimWhitespaces(data[0:firstMatchStart])
token = trimFunc(data[0:firstMatchStart])

// return if non-matching pattern is not only whitespaces
if token != nil {
Expand All @@ -129,7 +129,7 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {

// Flush if no more data is expected
if atEOF && flushAtEOF {
token = trimWhitespaces(data)
token = trimFunc(data)
advance = len(data)
return
}
Expand All @@ -141,22 +141,22 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
}
secondMatchStart := secondLoc[0] + secondLocOfset

advance = secondMatchStart // start scanning at the beginning of the second match
token = trimWhitespaces(data[firstMatchStart:secondMatchStart]) // the token begins at the first match, and ends at the beginning of the second match
advance = secondMatchStart // start scanning at the beginning of the second match
token = trimFunc(data[firstMatchStart:secondMatchStart]) // the token begins at the first match, and ends at the beginning of the second match
err = nil
return
}
}

// NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that end with a match to the regex pattern provided
func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
loc := re.FindIndex(data)
if loc == nil {
// Flush if no more data is expected
if len(data) != 0 && atEOF && flushAtEOF {
token = trimWhitespaces(data)
token = trimFunc(data)
advance = len(data)
return
}
Expand All @@ -170,15 +170,15 @@ func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
}

advance = loc[1]
token = trimWhitespaces(data[:loc[1]])
token = trimFunc(data[:loc[1]])
err = nil
return
}
}

// NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but
// never returning an token using EOF as a terminator
func NewNewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) {
func NewNewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trimFunc) (bufio.SplitFunc, error) {
newline, err := encodedNewline(enc)
if err != nil {
return nil, err
Expand All @@ -198,12 +198,12 @@ func NewNewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool) (bufio.SplitFun
// We have a full newline-terminated line.
token = bytes.TrimSuffix(data[:i], carriageReturn)

return i + len(newline), trimWhitespaces(token), nil
return i + len(newline), trimFunc(token), nil
}

// Flush if no more data is expected
if atEOF && flushAtEOF {
token = trimWhitespaces(data)
token = trimFunc(data)
advance = len(data)
return
}
Expand All @@ -225,13 +225,44 @@ func encodedCarriageReturn(enc encoding.Encoding) ([]byte, error) {
return out[:nDst], err
}

func trimWhitespaces(data []byte) []byte {
type trimFunc func([]byte) []byte

func noTrim(token []byte) []byte {
return token
}

func trimLeadingWhitespacesFunc(data []byte) []byte {
// TrimLeft to strip EOF whitespaces in case of using $ in regex
// For some reason newline and carriage return are being moved to beginning of next log
token := bytes.TrimLeft(data, "\r\n\t ")
if token == nil {
return []byte{}
}
return token
}

func trimTrailingWhitespacesFunc(data []byte) []byte {
// TrimRight to strip all whitespaces from the end of log
token := bytes.TrimLeft(bytes.TrimRight(data, "\r\n\t "), "\r\n")
token := bytes.TrimRight(data, "\r\n\t ")
if token == nil {
return []byte{}
}
return token
}

func trimWhitespacesFunc(data []byte) []byte {
return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data))
}

func getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) trimFunc {
if preserveLeadingWhitespaces && preserveTrailingWhitespaces {
return noTrim
}
if preserveLeadingWhitespaces {
return trimTrailingWhitespacesFunc
}
if preserveTrailingWhitespaces {
return trimLeadingWhitespacesFunc
}
return trimWhitespacesFunc
}
Loading

0 comments on commit 11f8ed1

Please sign in to comment.