Skip to content

Commit

Permalink
bugfix: Add max bytes in line limit elastic#19552
Browse files Browse the repository at this point in the history
chery pick from elastic#19552
  • Loading branch information
benero committed Jul 20, 2021
1 parent 4fcf553 commit 9386dbf
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 17 deletions.
12 changes: 11 additions & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,17 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize)
// Configure MaxBytes limit for EncodeReader as multiplied by 4
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := h.config.MaxBytes * 4

r, err = readfile.NewEncodeReader(reader, readfile.Config{
Codec: h.encoding,
BufferSize: h.config.BufferSize,
MaxBytes: encReaderMaxBytes,
})
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (p *Input) scan() {

// Decides if previous state exists
if lastState.IsEmpty() {
logp.Debug("input", "Start harvester for new file: %s, offset: %s", newState.Source, newState.Offset)
logp.Debug("input", "Start harvester for new file: %s, offset: %d", newState.Source, newState.Offset)
err := p.startHarvester(newState, 0)
if err == errHarvesterLimit {
logp.Debug("input", harvesterErrMsg, newState.Source, err)
Expand Down
19 changes: 12 additions & 7 deletions libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package readfile

import (
"bytes"
"io"
"time"

Expand All @@ -31,14 +32,18 @@ type EncoderReader struct {
reader *LineReader
}

// Config stores the configuration for the readers required to read
// a file line by line
type Config struct {
Codec encoding.Encoding
BufferSize int
MaxBytes int
}

// New creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(
r io.Reader,
codec encoding.Encoding,
bufferSize int,
) (EncoderReader, error) {
eReader, err := NewLineReader(r, codec, bufferSize)
func NewEncodeReader(r io.Reader, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
return EncoderReader{eReader}, err
}

Expand All @@ -49,7 +54,7 @@ func (r EncoderReader) Next() (reader.Message, error) {
// Creating message object
return reader.Message{
Ts: time.Now(),
Content: c,
Content: bytes.Trim(c, "\xef\xbb\xbf"),
Bytes: sz,
}, err
}
9 changes: 8 additions & 1 deletion libbeat/reader/readfile/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package readfile

import (
"fmt"

"github.com/elastic/beats/libbeat/reader"
)

Expand All @@ -37,7 +39,12 @@ func NewLimitReader(r reader.Reader, maxBytes int) *LimitReader {
func (r *LimitReader) Next() (reader.Message, error) {
message, err := r.reader.Next()
if len(message.Content) > r.maxBytes {
message.Content = message.Content[:r.maxBytes]
tmp := make([]byte, r.maxBytes)
n := copy(tmp, message.Content)
if n != r.maxBytes {
return message, fmt.Errorf("unexpected number of bytes were copied, %d instead of limit %d", n, r.maxBytes)
}
message.Content = tmp
message.AddFlagsWithKey("log.flags", "truncated")
}
return message, err
Expand Down
81 changes: 76 additions & 5 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package readfile

import (
"bytes"
"io"

"golang.org/x/text/encoding"
Expand All @@ -27,13 +28,16 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

const unlimited = 0

// lineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.Reader
codec encoding.Encoding
bufferSize int
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
Expand All @@ -43,8 +47,8 @@ type LineReader struct {
}

// New creates a new reader object
func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) {
encoder := codec.NewEncoder()
func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
encoder := config.Codec.NewEncoder()

// Create newline char based on encoding
nl, _, err := transform.Bytes(encoder, []byte{'\n'})
Expand All @@ -54,10 +58,11 @@ func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*L

return &LineReader{
reader: input,
codec: codec,
bufferSize: bufferSize,
codec: config.Codec,
bufferSize: config.BufferSize,
maxBytes: config.MaxBytes,
nl: nl,
decoder: codec.NewDecoder(),
decoder: config.Codec.NewDecoder(),
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
}, nil
Expand Down Expand Up @@ -138,6 +143,29 @@ func (r *LineReader) advance() error {

// Check if buffer has newLine character
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)

// If max bytes limit per line is set, then drop the lines that are longer
if r.maxBytes != 0 {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
r.inBuffer.Reset()
r.inOffset = 0
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
if err != nil {
logp.Err("Error skipping until new line, err: %v", err)
return err
}
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}
}
}

// found encoded byte sequence for '\n' in buffer
Expand All @@ -163,6 +191,49 @@ func (r *LineReader) advance() error {
return err
}

func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
// The length of the line skipped
skipped := r.inBuffer.Len()

// Clean up the buffer
err := r.inBuffer.Advance(skipped)
r.inBuffer.Reset()

// Reset inOffset
r.inOffset = 0

if err != nil {
return 0, err
}

// Read until the new line is found
for idx := -1; idx == -1; {
n, err := r.reader.Read(buf)

// Check bytes read for newLine
if n > 0 {
idx = bytes.Index(buf[:n], r.nl)

if idx != -1 {
r.inBuffer.Append(buf[idx+len(r.nl) : n])
skipped += idx
} else {
skipped += n
}
}

if err != nil {
return skipped, err
}

if n == 0 {
return skipped, streambuf.ErrNoMoreBytes
}
}

return skipped, nil
}

func (r *LineReader) decode(end int) (int, error) {
var err error
buffer := make([]byte, 1024)
Expand Down
143 changes: 141 additions & 2 deletions libbeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ package readfile

import (
"bytes"
"encoding/hex"
"io"
"io/ioutil"
"math/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/text/transform"
Expand Down Expand Up @@ -93,7 +98,7 @@ func TestReaderEncodings(t *testing.T) {
}

// create line reader
reader, err := NewLineReader(buffer, codec, 1024)
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, unlimited})
if err != nil {
t.Errorf("failed to initialize reader: %v", err)
continue
Expand Down Expand Up @@ -184,7 +189,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
// initialize reader
buffer := bytes.NewBuffer(inputStream)
codec, _ := encoding.Plain(buffer)
reader, err := NewLineReader(buffer, codec, buffer.Len())
reader, err := NewLineReader(buffer, Config{codec, buffer.Len(), unlimited})
if err != nil {
t.Fatalf("Error initializing reader: %v", err)
}
Expand All @@ -210,3 +215,137 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
func testReadLine(t *testing.T, line []byte) {
testReadLines(t, [][]byte{line})
}

func randomInt(r *rand.Rand, min, max int) int {
return r.Intn(max+1-min) + min
}

func randomBool(r *rand.Rand) bool {
n := randomInt(r, 0, 1)
return n != 0
}

func randomBytes(r *rand.Rand, sz int) ([]byte, error) {
bytes := make([]byte, sz)
if _, err := rand.Read(bytes); err != nil {
return nil, err
}
return bytes, nil
}

func randomString(r *rand.Rand, sz int) (string, error) {
if sz == 0 {
return "", nil
}

var bytes []byte
var err error
if bytes, err = randomBytes(r, sz/2+sz%2); err != nil {
return "", err
}
s := hex.EncodeToString(bytes)
return s[:sz], nil
}

func setupTestMaxBytesLimit(lineMaxLimit, lineLen int, nl []byte) (lines []string, data string, err error) {
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))

lineCount := randomInt(rnd, 11, 142)
lines = make([]string, lineCount)

var b strings.Builder

for i := 0; i < lineCount; i++ {
var sz int
// Non-empty line
if randomBool(rnd) {
// Boundary to the lineMaxLimit
if randomBool(rnd) {
sz = randomInt(rnd, lineMaxLimit-1, lineMaxLimit+1)
} else {
sz = randomInt(rnd, 0, lineLen)
}
} else {
// Randomly empty or one characters lines(another possibly boundary conditions)
sz = randomInt(rnd, 0, 1)
}

s, err := randomString(rnd, sz)
if err != nil {
return nil, "", err
}

lines[i] = s
if len(s) > 0 {
b.WriteString(s)
}
b.Write(nl)
}
return lines, b.String(), nil
}

func TestMaxBytesLimit(t *testing.T) {
const (
enc = "plain"
numberOfLines = 102
bufferSize = 1024
lineMaxLimit = 3012
lineLen = 5720 // exceeds lineMaxLimit
)

codecFactory, ok := encoding.FindEncoding(enc)
if !ok {
t.Fatalf("can not find encoding '%v'", enc)
}

buffer := bytes.NewBuffer(nil)
codec, _ := codecFactory(buffer)
nl := []byte{'\n'}

// Generate random lines lengths including empty lines
lines, input, err := setupTestMaxBytesLimit(lineMaxLimit, lineLen, nl)
if err != nil {
t.Fatal("failed to generate random input:", err)
}

// Create line reader
reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, lineMaxLimit})
if err != nil {
t.Fatal("failed to initialize reader:", err)
}

// Read decodec lines and test
var idx int
for i := 0; ; i++ {
b, n, err := reader.Next()
if err != nil {
if err == io.EOF {
break
} else {
t.Fatal("unexpected error:", err)
}
}

// Find the next expected line from the original test array
var line string
for ; idx < len(lines); idx++ {
// Expected to be dropped
if len(lines[idx]) > lineMaxLimit {
continue
}
line = lines[idx]
idx++
break
}

gotLen := n - len(nl)
s := string(b[:len(b)-len(nl)])
if len(line) != gotLen {
t.Fatalf("invalid line length, expected: %d got: %d", len(line), gotLen)
}

if line != s {
t.Fatalf("lines do not match, expected: %s got: %s", line, s)
}
}
}

0 comments on commit 9386dbf

Please sign in to comment.