Skip to content

Commit

Permalink
Limit the number of bytes read by LineReader in Filebeat (elastic#19552)
Browse files Browse the repository at this point in the history
(cherry picked from commit 0e049f0)
  • Loading branch information
aleksmaus authored and kvch committed Jul 22, 2020
1 parent 93abfd9 commit 25b3ebf
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v6.8.0...6.8.1[Check the HEAD diff]

*Filebeat*

- Fix Filebeat OOMs on very long lines {issue}19500[19500], {pull}19552[19552]

*Heartbeat*

Expand Down
10 changes: 9 additions & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
var r reader.Reader
var err error

logp.Debug("harvester", "newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
Expand All @@ -570,7 +572,13 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize)
// Configure MaxBytes limit for EncodeReader as multiplied by 4
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := h.config.MaxBytes * 4

r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize, encReaderMaxBytes)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade
}

var r reader.Reader
r, err = readfile.NewEncodeReader(in, enc, 4096)
r, err = readfile.NewEncodeReader(in, enc, 4096, 4096)
if err != nil {
t.Fatalf("Failed to initialize line reader: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions filebeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type EncoderReader struct {
func NewEncodeReader(
r io.Reader,
codec encoding.Encoding,
bufferSize int,
bufferSize, maxBytes int,
) (EncoderReader, error) {
eReader, err := NewLineReader(r, codec, bufferSize)
eReader, err := NewLineReader(r, codec, bufferSize, maxBytes)
return EncoderReader{eReader}, err
}

Expand Down
71 changes: 70 additions & 1 deletion filebeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package readfile

import (
"bytes"
"io"

"golang.org/x/text/encoding"
Expand All @@ -34,6 +35,7 @@ type LineReader struct {
reader io.Reader
codec encoding.Encoding
bufferSize int
maxBytes int
nl []byte
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
Expand All @@ -43,7 +45,7 @@ type LineReader struct {
}

// New creates a new reader object
func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) {
func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize, maxBytes int) (*LineReader, error) {
encoder := codec.NewEncoder()

// Create newline char based on encoding
Expand All @@ -56,6 +58,7 @@ func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*L
reader: input,
codec: codec,
bufferSize: bufferSize,
maxBytes: maxBytes,
nl: nl,
decoder: codec.NewDecoder(),
inBuffer: streambuf.New(nil),
Expand Down Expand Up @@ -138,6 +141,29 @@ func (r *LineReader) advance() error {

// Check if buffer has newLine character
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)

// If max bytes limit per line is set, then drop the lines that are longer
if r.maxBytes != 0 {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
r.inBuffer.Reset()
r.inOffset = 0
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
if err != nil {
logp.Err("Error skipping until new line, err: %s", err)
return err
}
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}
}
}

// found encoded byte sequence for '\n' in buffer
Expand All @@ -163,6 +189,49 @@ func (r *LineReader) advance() error {
return err
}

func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
// The length of the line skipped
skipped := r.inBuffer.Len()

// Clean up the buffer
err := r.inBuffer.Advance(skipped)
r.inBuffer.Reset()

// Reset inOffset
r.inOffset = 0

if err != nil {
return 0, err
}

// Read until the new line is found
for idx := -1; idx == -1; {
n, err := r.reader.Read(buf)

// Check bytes read for newLine
if n > 0 {
idx = bytes.Index(buf[:n], r.nl)

if idx != -1 {
r.inBuffer.Append(buf[idx+len(r.nl) : n])
skipped += idx
} else {
skipped += n
}
}

if err != nil {
return skipped, err
}

if n == 0 {
return skipped, streambuf.ErrNoMoreBytes
}
}

return skipped, nil
}

func (r *LineReader) decode(end int) (int, error) {
var err error
buffer := make([]byte, 1024)
Expand Down
143 changes: 141 additions & 2 deletions filebeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ package readfile

import (
"bytes"
"encoding/hex"
"io"
"math/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/text/transform"
Expand Down Expand Up @@ -68,7 +72,7 @@ func TestReaderEncodings(t *testing.T) {
}

// create line reader
reader, err := NewLineReader(buffer, codec, 1024)
reader, err := NewLineReader(buffer, codec, 1024, 1024)
if err != nil {
t.Errorf("failed to initialize reader: %v", err)
continue
Expand Down Expand Up @@ -159,7 +163,8 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
// initialize reader
buffer := bytes.NewBuffer(inputStream)
codec, _ := encoding.Plain(buffer)
reader, err := NewLineReader(buffer, codec, buffer.Len())
bufLen := buffer.Len()
reader, err := NewLineReader(buffer, codec, bufLen, bufLen)
if err != nil {
t.Fatalf("Error initializing reader: %v", err)
}
Expand All @@ -185,3 +190,137 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
func testReadLine(t *testing.T, line []byte) {
testReadLines(t, [][]byte{line})
}

func randomInt(r *rand.Rand, min, max int) int {
return r.Intn(max+1-min) + min
}

func randomBool(r *rand.Rand) bool {
n := randomInt(r, 0, 1)
return n != 0
}

func randomBytes(r *rand.Rand, sz int) ([]byte, error) {
bytes := make([]byte, sz)
if _, err := rand.Read(bytes); err != nil {
return nil, err
}
return bytes, nil
}

func randomString(r *rand.Rand, sz int) (string, error) {
if sz == 0 {
return "", nil
}

var bytes []byte
var err error
if bytes, err = randomBytes(r, sz/2+sz%2); err != nil {
return "", err
}
s := hex.EncodeToString(bytes)
return s[:sz], nil
}

func setupTestMaxBytesLimit(lineMaxLimit, lineLen int, nl []byte) (lines []string, data string, err error) {
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))

lineCount := randomInt(rnd, 11, 142)
lines = make([]string, lineCount)

var b strings.Builder

for i := 0; i < lineCount; i++ {
var sz int
// Non-empty line
if randomBool(rnd) {
// Boundary to the lineMaxLimit
if randomBool(rnd) {
sz = randomInt(rnd, lineMaxLimit-1, lineMaxLimit+1)
} else {
sz = randomInt(rnd, 0, lineLen)
}
} else {
// Randomly empty or one characters lines(another possibly boundary conditions)
sz = randomInt(rnd, 0, 1)
}

s, err := randomString(rnd, sz)
if err != nil {
return nil, "", err
}

lines[i] = s
if len(s) > 0 {
b.WriteString(s)
}
b.Write(nl)
}
return lines, b.String(), nil
}

func TestMaxBytesLimit(t *testing.T) {
const (
enc = "plain"
numberOfLines = 102
bufferSize = 1024
lineMaxLimit = 3012
lineLen = 5720 // exceeds lineMaxLimit
)

codecFactory, ok := encoding.FindEncoding(enc)
if !ok {
t.Fatalf("can not find encoding '%v'", enc)
}

buffer := bytes.NewBuffer(nil)
codec, _ := codecFactory(buffer)

// Generate random lines lengths including empty lines
nl := []byte("\n")
lines, input, err := setupTestMaxBytesLimit(lineMaxLimit, lineLen, nl)
if err != nil {
t.Fatal("failed to generate random input:", err)
}

// Create line reader
reader, err := NewLineReader(strings.NewReader(input), codec, bufferSize, lineMaxLimit)
if err != nil {
t.Fatal("failed to initialize reader:", err)
}

// Read decodec lines and test
var idx int
for i := 0; ; i++ {
b, n, err := reader.Next()
if err != nil {
if err == io.EOF {
break
} else {
t.Fatal("unexpected error:", err)
}
}

// Find the next expected line from the original test array
var line string
for ; idx < len(lines); idx++ {
// Expected to be dropped
if len(lines[idx]) > lineMaxLimit {
continue
}
line = lines[idx]
idx++
break
}

gotLen := n - len(nl)
s := string(b[:len(b)-len(nl)])
if len(line) != gotLen {
t.Fatalf("invalid line length, expected: %d got: %d", len(line), gotLen)
}

if line != s {
t.Fatalf("lines do not match, expected: %s got: %s", line, s)
}
}
}
2 changes: 1 addition & 1 deletion filebeat/scripts/tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) {
}

var r reader.Reader
r, err = readfile.NewEncodeReader(f, enc, 4096)
r, err = readfile.NewEncodeReader(f, enc, 4096, 4096)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 25b3ebf

Please sign in to comment.