Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #19552 to 7.x: Add max bytes in line limit #20081

Merged
merged 1 commit into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Fix memory leak in tcp and unix input sources. {pull}19459[19459]
- Fix Cisco ASA dissect pattern for 313008 & 313009 messages. {pull}19149[19149]
- Fix bug with empty filter values in system/service {pull}19812[19812]
- Fix Filebeat OOMs on very long lines {issue}19500[19500], {pull}19552[19552]

*Heartbeat*

Expand Down
9 changes: 9 additions & 0 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
var r reader.Reader
var err error

logp.Debug("harvester", "newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
Expand All @@ -644,10 +646,17 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

// Configure MaxBytes limit for EncodeReader as multiplied by 4
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := h.config.MaxBytes * 4

r, err = readfile.NewEncodeReader(reader, readfile.Config{
Codec: h.encoding,
BufferSize: h.config.BufferSize,
Terminator: h.config.LineTerminator,
MaxBytes: encReaderMaxBytes,
})
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Config struct {
Codec encoding.Encoding
BufferSize int
Terminator LineTerminator
MaxBytes int
}

// New creates a new Encode reader from input reader by applying
Expand Down
86 changes: 78 additions & 8 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
)

const unlimited = 0

// lineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.Reader
bufferSize int
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
inBuffer *streambuf.Buffer
Expand Down Expand Up @@ -62,6 +65,7 @@ func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
return &LineReader{
reader: input,
bufferSize: config.BufferSize,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
Expand Down Expand Up @@ -121,17 +125,17 @@ func (r *LineReader) advance() error {
// Initial check if buffer has already a newLine character
idx := r.inBuffer.IndexFrom(r.inOffset, r.nl)

// fill inBuffer until newline sequence has been found in input buffer
// Fill inBuffer until newline sequence has been found in input buffer
for idx == -1 {
// increase search offset to reduce iterations on buffer when looping
// Increase search offset to reduce iterations on buffer when looping
newOffset := r.inBuffer.Len() - len(r.nl)
if newOffset > r.inOffset {
r.inOffset = newOffset
}

buf := make([]byte, r.bufferSize)

// try to read more bytes into buffer
// Try to read more bytes into buffer
n, err := r.reader.Read(buf)

// Appends buffer also in case of err
Expand All @@ -140,16 +144,39 @@ func (r *LineReader) advance() error {
return err
}

// empty read => return buffer error (more bytes required error)
// Empty read => return buffer error (more bytes required error)
if n == 0 {
return streambuf.ErrNoMoreBytes
}

// Check if buffer has newLine character
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)

// If max bytes limit per line is set, then drop the lines that are longer
if r.maxBytes != 0 {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
r.inBuffer.Reset()
r.inOffset = 0
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
if err != nil {
r.logger.Error("Error skipping until new line, err:", err)
return err
}
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}
}
}

// found encoded byte sequence for newline in buffer
// Found encoded byte sequence for newline in buffer
// -> decode input sequence into outBuffer
sz, err := r.decode(idx + len(r.nl))
if err != nil {
Expand All @@ -158,20 +185,63 @@ func (r *LineReader) advance() error {
sz = idx + len(r.nl)
}

// consume transformed bytes from input buffer
// Consume transformed bytes from input buffer
err = r.inBuffer.Advance(sz)
r.inBuffer.Reset()

// continue scanning input buffer from last position + 1
// Continue scanning input buffer from last position + 1
r.inOffset = idx + 1 - sz
if r.inOffset < 0 {
// fix inOffset if newline has encoding > 8bits + firl line has been decoded
// Fix inOffset if newline has encoding > 8bits + firl line has been decoded
r.inOffset = 0
}

return err
}

func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
// The length of the line skipped
skipped := r.inBuffer.Len()

// Clean up the buffer
err := r.inBuffer.Advance(skipped)
r.inBuffer.Reset()

// Reset inOffset
r.inOffset = 0

if err != nil {
return 0, err
}

// Read until the new line is found
for idx := -1; idx == -1; {
n, err := r.reader.Read(buf)

// Check bytes read for newLine
if n > 0 {
idx = bytes.Index(buf[:n], r.nl)

if idx != -1 {
r.inBuffer.Append(buf[idx+len(r.nl) : n])
skipped += idx
} else {
skipped += n
}
}

if err != nil {
return skipped, err
}

if n == 0 {
return skipped, streambuf.ErrNoMoreBytes
}
}

return skipped, nil
}

func (r *LineReader) decode(end int) (int, error) {
var err error
buffer := make([]byte, 1024)
Expand Down
Loading