From 74ce8965e0236a65d809ba684a6b79c18f1dd484 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Tue, 21 Jul 2020 04:33:47 -0400 Subject: [PATCH] Limit the number of bytes read by LineReader in Filebeat (#19552) (cherry picked from commit 0e049f0749a239eb60eb9cbc75906499ae9008e8) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/log/harvester.go | 9 ++ libbeat/reader/readfile/encode.go | 1 + libbeat/reader/readfile/line.go | 86 ++++++++++++-- libbeat/reader/readfile/line_test.go | 169 ++++++++++++++++++++++++--- 5 files changed, 244 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ea336f2e1ed..5e1637b7678 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -247,6 +247,7 @@ field. You can revert this change by configuring tags for the module and omittin - Fix memory leak in tcp and unix input sources. {pull}19459[19459] - Fix Cisco ASA dissect pattern for 313008 & 313009 messages. {pull}19149[19149] - Fix bug with empty filter values in system/service {pull}19812[19812] +- Fix Filebeat OOMs on very long lines {issue}19500[19500], {pull}19552[19552] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 95043e94237..60c94dc3cb5 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -631,6 +631,8 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { var r reader.Reader var err error + logp.Debug("harvester", "newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes) + // TODO: NewLineReader uses additional buffering to deal with encoding and testing // for new lines in input stream. Simple 8-bit based encodings, or plain // don't require 'complicated' logic. @@ -644,10 +646,17 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { return nil, err } + // Configure MaxBytes limit for EncodeReader as multiplied by 4 + // for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters. + // This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file. + // The further size limiting is performed by LimitReader at the end of the readers pipeline as needed. + encReaderMaxBytes := h.config.MaxBytes * 4 + r, err = readfile.NewEncodeReader(reader, readfile.Config{ Codec: h.encoding, BufferSize: h.config.BufferSize, Terminator: h.config.LineTerminator, + MaxBytes: encReaderMaxBytes, }) if err != nil { return nil, err diff --git a/libbeat/reader/readfile/encode.go b/libbeat/reader/readfile/encode.go index d2cf3cce4be..b5b526ad361 100644 --- a/libbeat/reader/readfile/encode.go +++ b/libbeat/reader/readfile/encode.go @@ -38,6 +38,7 @@ type Config struct { Codec encoding.Encoding BufferSize int Terminator LineTerminator + MaxBytes int } // New creates a new Encode reader from input reader by applying diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 5ea9150a8b9..e9ba491f483 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -28,12 +28,15 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) +const unlimited = 0 + // lineReader reads lines from underlying reader, decoding the input stream // using the configured codec. The reader keeps track of bytes consumed // from raw input stream for every decoded line. type LineReader struct { reader io.Reader bufferSize int + maxBytes int // max bytes per line limit to avoid OOM with malformatted files nl []byte decodedNl []byte inBuffer *streambuf.Buffer @@ -62,6 +65,7 @@ func NewLineReader(input io.Reader, config Config) (*LineReader, error) { return &LineReader{ reader: input, bufferSize: config.BufferSize, + maxBytes: config.MaxBytes, decoder: config.Codec.NewDecoder(), nl: nl, decodedNl: terminator, @@ -121,9 +125,9 @@ func (r *LineReader) advance() error { // Initial check if buffer has already a newLine character idx := r.inBuffer.IndexFrom(r.inOffset, r.nl) - // fill inBuffer until newline sequence has been found in input buffer + // Fill inBuffer until newline sequence has been found in input buffer for idx == -1 { - // increase search offset to reduce iterations on buffer when looping + // Increase search offset to reduce iterations on buffer when looping newOffset := r.inBuffer.Len() - len(r.nl) if newOffset > r.inOffset { r.inOffset = newOffset @@ -131,7 +135,7 @@ func (r *LineReader) advance() error { buf := make([]byte, r.bufferSize) - // try to read more bytes into buffer + // Try to read more bytes into buffer n, err := r.reader.Read(buf) // Appends buffer also in case of err @@ -140,16 +144,39 @@ func (r *LineReader) advance() error { return err } - // empty read => return buffer error (more bytes required error) + // Empty read => return buffer error (more bytes required error) if n == 0 { return streambuf.ErrNoMoreBytes } // Check if buffer has newLine character idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) + + // If max bytes limit per line is set, then drop the lines that are longer + if r.maxBytes != 0 { + // If newLine is found, drop the lines longer than maxBytes + for idx != -1 && idx > r.maxBytes { + r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx) + err = r.inBuffer.Advance(idx + len(r.nl)) + r.inBuffer.Reset() + r.inOffset = 0 + idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) + } + + // If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine + if idx == -1 && r.inBuffer.Len() > r.maxBytes { + skipped, err := r.skipUntilNewLine(buf) + if err != nil { + r.logger.Error("Error skipping until new line, err:", err) + return err + } + r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped) + idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) + } + } } - // found encoded byte sequence for newline in buffer + // Found encoded byte sequence for newline in buffer // -> decode input sequence into outBuffer sz, err := r.decode(idx + len(r.nl)) if err != nil { @@ -158,20 +185,63 @@ func (r *LineReader) advance() error { sz = idx + len(r.nl) } - // consume transformed bytes from input buffer + // Consume transformed bytes from input buffer err = r.inBuffer.Advance(sz) r.inBuffer.Reset() - // continue scanning input buffer from last position + 1 + // Continue scanning input buffer from last position + 1 r.inOffset = idx + 1 - sz if r.inOffset < 0 { - // fix inOffset if newline has encoding > 8bits + firl line has been decoded + // Fix inOffset if newline has encoding > 8bits + firl line has been decoded r.inOffset = 0 } return err } +func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) { + // The length of the line skipped + skipped := r.inBuffer.Len() + + // Clean up the buffer + err := r.inBuffer.Advance(skipped) + r.inBuffer.Reset() + + // Reset inOffset + r.inOffset = 0 + + if err != nil { + return 0, err + } + + // Read until the new line is found + for idx := -1; idx == -1; { + n, err := r.reader.Read(buf) + + // Check bytes read for newLine + if n > 0 { + idx = bytes.Index(buf[:n], r.nl) + + if idx != -1 { + r.inBuffer.Append(buf[idx+len(r.nl) : n]) + skipped += idx + } else { + skipped += n + } + } + + if err != nil { + return skipped, err + } + + if n == 0 { + return skipped, streambuf.ErrNoMoreBytes + } + } + + return skipped, nil +} + func (r *LineReader) decode(end int) (int, error) { var err error buffer := make([]byte, 1024) diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go index 13b13127a86..10a1ff958b5 100644 --- a/libbeat/reader/readfile/line_test.go +++ b/libbeat/reader/readfile/line_test.go @@ -21,8 +21,12 @@ package readfile import ( "bytes" + "encoding/hex" + "io" "math/rand" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" "golang.org/x/text/transform" @@ -31,10 +35,12 @@ import ( ) // Sample texts are from http://www.columbia.edu/~kermit/utf8.html -var tests = []struct { +type lineTestCase struct { encoding string strings []string -}{ +} + +var tests = []lineTestCase{ {"plain", []string{"I can", "eat glass"}}, {"latin1", []string{"I kå Glas frässa", "ond des macht mr nix!"}}, {"utf-16be", []string{"Pot să mănânc sticlă", "și ea nu mă rănește."}}, @@ -71,13 +77,10 @@ var tests = []struct { } func TestReaderEncodings(t *testing.T) { - for _, test := range tests { - t.Logf("test codec: %v", test.encoding) - + runTest := func(t *testing.T, test lineTestCase) { codecFactory, ok := encoding.FindEncoding(test.encoding) if !ok { - t.Errorf("can not find encoding '%v'", test.encoding) - continue + t.Fatalf("can not find encoding '%v'", test.encoding) } buffer := bytes.NewBuffer(nil) @@ -94,10 +97,9 @@ func TestReaderEncodings(t *testing.T) { } // create line reader - reader, err := NewLineReader(buffer, Config{codec, 1024, LineFeed}) + reader, err := NewLineReader(buffer, Config{codec, 1024, LineFeed, unlimited}) if err != nil { - t.Errorf("failed to initialize reader: %v", err) - continue + t.Fatal("failed to initialize reader:", err) } // read decodec lines from buffer @@ -120,9 +122,8 @@ func TestReaderEncodings(t *testing.T) { // validate lines and byte offsets if len(test.strings) != len(readLines) { - t.Errorf("number of lines mismatch (expected=%v actual=%v)", + t.Fatalf("number of lines mismatch (expected=%v actual=%v)", len(test.strings), len(readLines)) - continue } for i := range test.strings { expected := test.strings[i] @@ -131,6 +132,12 @@ func TestReaderEncodings(t *testing.T) { assert.Equal(t, expectedCount[i], byteCounts[i]) } } + + for _, test := range tests { + t.Run(test.encoding, func(t *testing.T) { + runTest(t, test) + }) + } } func TestLineTerminators(t *testing.T) { @@ -150,7 +157,7 @@ func TestLineTerminators(t *testing.T) { buffer.Write([]byte("this is my second line")) buffer.Write(nl) - reader, err := NewLineReader(buffer, Config{codec, 1024, terminator}) + reader, err := NewLineReader(buffer, Config{codec, 1024, terminator, unlimited}) if err != nil { t.Errorf("failed to initialize reader: %v", err) continue @@ -222,7 +229,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) { // initialize reader buffer := bytes.NewBuffer(inputStream) codec, _ := encoding.Plain(buffer) - reader, err := NewLineReader(buffer, Config{codec, buffer.Len(), LineFeed}) + reader, err := NewLineReader(buffer, Config{codec, buffer.Len(), LineFeed, unlimited}) if err != nil { t.Fatalf("Error initializing reader: %v", err) } @@ -248,3 +255,137 @@ func testReadLines(t *testing.T, inputLines [][]byte) { func testReadLine(t *testing.T, line []byte) { testReadLines(t, [][]byte{line}) } + +func randomInt(r *rand.Rand, min, max int) int { + return r.Intn(max+1-min) + min +} + +func randomBool(r *rand.Rand) bool { + n := randomInt(r, 0, 1) + return n != 0 +} + +func randomBytes(r *rand.Rand, sz int) ([]byte, error) { + bytes := make([]byte, sz) + if _, err := rand.Read(bytes); err != nil { + return nil, err + } + return bytes, nil +} + +func randomString(r *rand.Rand, sz int) (string, error) { + if sz == 0 { + return "", nil + } + + var bytes []byte + var err error + if bytes, err = randomBytes(r, sz/2+sz%2); err != nil { + return "", err + } + s := hex.EncodeToString(bytes) + return s[:sz], nil +} + +func setupTestMaxBytesLimit(lineMaxLimit, lineLen int, nl []byte) (lines []string, data string, err error) { + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + lineCount := randomInt(rnd, 11, 142) + lines = make([]string, lineCount) + + var b strings.Builder + + for i := 0; i < lineCount; i++ { + var sz int + // Non-empty line + if randomBool(rnd) { + // Boundary to the lineMaxLimit + if randomBool(rnd) { + sz = randomInt(rnd, lineMaxLimit-1, lineMaxLimit+1) + } else { + sz = randomInt(rnd, 0, lineLen) + } + } else { + // Randomly empty or one characters lines(another possibly boundary conditions) + sz = randomInt(rnd, 0, 1) + } + + s, err := randomString(rnd, sz) + if err != nil { + return nil, "", err + } + + lines[i] = s + if len(s) > 0 { + b.WriteString(s) + } + b.Write(nl) + } + return lines, b.String(), nil +} + +func TestMaxBytesLimit(t *testing.T) { + const ( + enc = "plain" + numberOfLines = 102 + bufferSize = 1024 + lineMaxLimit = 3012 + lineLen = 5720 // exceeds lineMaxLimit + ) + + codecFactory, ok := encoding.FindEncoding(enc) + if !ok { + t.Fatalf("can not find encoding '%v'", enc) + } + + buffer := bytes.NewBuffer(nil) + codec, _ := codecFactory(buffer) + nl := lineTerminatorCharacters[LineFeed] + + // Generate random lines lengths including empty lines + lines, input, err := setupTestMaxBytesLimit(lineMaxLimit, lineLen, nl) + if err != nil { + t.Fatal("failed to generate random input:", err) + } + + // Create line reader + reader, err := NewLineReader(strings.NewReader(input), Config{codec, bufferSize, LineFeed, lineMaxLimit}) + if err != nil { + t.Fatal("failed to initialize reader:", err) + } + + // Read decodec lines and test + var idx int + for i := 0; ; i++ { + b, n, err := reader.Next() + if err != nil { + if err == io.EOF { + break + } else { + t.Fatal("unexpected error:", err) + } + } + + // Find the next expected line from the original test array + var line string + for ; idx < len(lines); idx++ { + // Expected to be dropped + if len(lines[idx]) > lineMaxLimit { + continue + } + line = lines[idx] + idx++ + break + } + + gotLen := n - len(nl) + s := string(b[:len(b)-len(nl)]) + if len(line) != gotLen { + t.Fatalf("invalid line length, expected: %d got: %d", len(line), gotLen) + } + + if line != s { + t.Fatalf("lines do not match, expected: %s got: %s", line, s) + } + } +}