From bef28006119df702e94e7271bf59a4ea8101ec10 Mon Sep 17 00:00:00 2001 From: Mrod1598 Date: Wed, 15 Sep 2021 09:58:37 -0400 Subject: [PATCH] Update encoding noop (#262) * Add new Nop split func and testing for it * Add test to file input * WIP * Add Tests & Modify Split func * Add test for Error * Update 'emit' func to dump bytes on nop * Add MaxLogSize tests * Change where to decode * Add Large Log Const --- operator/builtin/input/file/config.go | 2 +- operator/builtin/input/file/file.go | 2 +- operator/builtin/input/file/file_test.go | 142 +++++++++++++++++++++++ operator/builtin/input/file/reader.go | 26 +++-- operator/builtin/input/file/util_test.go | 9 ++ operator/builtin/input/tcp/tcp.go | 2 +- operator/builtin/input/udp/udp.go | 2 +- operator/helper/multiline.go | 34 +++++- operator/helper/multiline_test.go | 113 +++++++++++++++++- 9 files changed, 311 insertions(+), 21 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 29d8bcbe..64cfd1be 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -119,7 +119,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, } // Ensure that multiline is buildable - _, err = c.Splitter.Build(encoding.Encoding, false) + _, err = c.Splitter.Build(encoding.Encoding, false, int(c.MaxLogSize)) if err != nil { return nil, err } diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index b864f6e2..a6b10bc5 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -415,5 +415,5 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { // getMultiline returns helper.Splitter structure and error eventually func (f *InputOperator) getMultiline() (*helper.Splitter, error) { - return f.Splitter.Build(f.encoding.Encoding, false) + return f.Splitter.Build(f.encoding.Encoding, false, f.MaxLogSize) } diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index cc0b9fe2..6cf01cff 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -205,6 +205,148 @@ func TestReadExistingLogs(t *testing.T) { waitForMessage(t, logReceived, "testlog2") } +// TestReadUsingNopEncoding tests when nop encoding is set, that the splitfunction returns all bytes unchanged. +func TestReadUsingNopEncoding(t *testing.T) { + tcs := []struct { + testName string + input []byte + test func(*testing.T, chan *entry.Entry) + }{ + { + "simple", + []byte("testlog1"), + func(t *testing.T, c chan *entry.Entry) { + waitForByteMessage(t, c, []byte("testlog1")) + }, + }, + { + "longer than maxlogsize", + []byte("testlog1testlog2testlog3"), + func(t *testing.T, c chan *entry.Entry) { + waitForByteMessage(t, c, []byte("testlog1")) + waitForByteMessage(t, c, []byte("testlog2")) + waitForByteMessage(t, c, []byte("testlog3")) + }, + }, + { + "doesn't hit max log size before eof", + []byte("testlog1testlog2test"), + func(t *testing.T, c chan *entry.Entry) { + waitForByteMessage(t, c, []byte("testlog1")) + waitForByteMessage(t, c, []byte("testlog2")) + waitForByteMessage(t, c, []byte("test")) + }, + }, + { + "special characters", + []byte("testlog1\n\ttestlog2\n\t"), + func(t *testing.T, c chan *entry.Entry) { + waitForByteMessage(t, c, []byte("testlog1")) + waitForByteMessage(t, c, []byte("\n\ttestlo")) + waitForByteMessage(t, c, []byte("g2\n\t")) + }, + }, + } + + t.Parallel() + + for _, tc := range tcs { + t.Run(tc.testName, func(t *testing.T) { + operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { + cfg.MaxLogSize = 8 + cfg.Encoding.Encoding = "nop" + }, nil) + // Create a file, then start + temp := openTemp(t, tempDir) + bytesWritten, err := temp.Write(tc.input) + require.Greater(t, bytesWritten, 0) + require.NoError(t, err) + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer operator.Stop() + + tc.test(t, logReceived) + }) + } +} + +func TestNopEncodingDifferentLogSizes(t *testing.T) { + tcs := []struct { + testName string + input []byte + test func(*testing.T, chan *entry.Entry) + maxLogSize helper.ByteSize + }{ + { + "same size", + []byte("testlog1"), + func(t *testing.T, c chan *entry.Entry) { + waitForByteMessage(t, c, []byte("testlog1")) + }, + 8, + }, + { + "massive log size", + []byte("testlog1"), + func(t *testing.T, c chan *entry.Entry) { + waitForByteMessage(t, c, []byte("testlog1")) + }, + 8000000, + }, + { + "slightly larger log size", + []byte("testlog1"), + func(t *testing.T, c chan *entry.Entry) { + waitForByteMessage(t, c, []byte("testlog1")) + }, + 9, + }, + { + "slightly smaller log size", + []byte("testlog1"), + func(t *testing.T, c chan *entry.Entry) { + waitForByteMessage(t, c, []byte("testlog")) + waitForByteMessage(t, c, []byte("1")) + }, + 7, + }, + { + "tiny log size", + []byte("testlog1"), + func(t *testing.T, c chan *entry.Entry) { + waitForByteMessage(t, c, []byte("t")) + waitForByteMessage(t, c, []byte("e")) + waitForByteMessage(t, c, []byte("s")) + waitForByteMessage(t, c, []byte("t")) + waitForByteMessage(t, c, []byte("l")) + waitForByteMessage(t, c, []byte("o")) + waitForByteMessage(t, c, []byte("g")) + waitForByteMessage(t, c, []byte("1")) + }, + 1, + }, + } + + t.Parallel() + + for _, tc := range tcs { + t.Run(tc.testName, func(t *testing.T) { + operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { + cfg.MaxLogSize = tc.maxLogSize + cfg.Encoding.Encoding = "nop" + }, nil) + // Create a file, then start + temp := openTemp(t, tempDir) + bytesWritten, err := temp.Write(tc.input) + require.Greater(t, bytesWritten, 0) + require.NoError(t, err) + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer operator.Stop() + + tc.test(t, logReceived) + }) + } +} + // ReadNewLogs tests that, after starting, if a new file is created // all the entries in that file are read from the beginning func TestReadNewLogs(t *testing.T) { diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 46f5f621..efaf60a4 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -25,6 +25,7 @@ import ( "golang.org/x/text/encoding" "golang.org/x/text/transform" + "github.com/open-telemetry/opentelemetry-log-collection/entry" "github.com/open-telemetry/opentelemetry-log-collection/errors" "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" ) @@ -166,15 +167,22 @@ func (r *Reader) emit(ctx context.Context, msgBuf []byte) error { if len(msgBuf) == 0 { return nil } - - msg, err := r.decode(msgBuf) - if err != nil { - return fmt.Errorf("decode: %s", err) - } - - e, err := r.fileInput.NewEntry(msg) - if err != nil { - return fmt.Errorf("create entry: %s", err) + var e *entry.Entry + var err error + if r.fileInput.encoding.Encoding == encoding.Nop { + e, err = r.fileInput.NewEntry(msgBuf) + if err != nil { + return fmt.Errorf("create entry: %s", err) + } + } else { + msg, err := r.decode(msgBuf) + if err != nil { + return fmt.Errorf("decode: %s", err) + } + e, err = r.fileInput.NewEntry(msg) + if err != nil { + return fmt.Errorf("create entry: %s", err) + } } if err := e.Set(r.fileInput.FilePathField, r.fileAttributes.Path); err != nil { diff --git a/operator/builtin/input/file/util_test.go b/operator/builtin/input/file/util_test.go index 16870bbb..2e8dfb07 100644 --- a/operator/builtin/input/file/util_test.go +++ b/operator/builtin/input/file/util_test.go @@ -151,6 +151,15 @@ func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { } } +func waitForByteMessage(t *testing.T, c chan *entry.Entry, expected []byte) { + select { + case e := <-c: + require.Equal(t, expected, e.Body.([]byte)) + case <-time.After(3 * time.Second): + require.FailNow(t, "Timed out waiting for message", expected) + } +} + func waitForMessages(t *testing.T, c chan *entry.Entry, expected []string) { receivedMessages := make([]string, 0, len(expected)) LOOP: diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 3adbcd37..9979781d 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -99,7 +99,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato } // Build multiline - splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil) + splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil, int(c.MaxLogSize)) if err != nil { return nil, err } diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index 080592ee..020db87f 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -82,7 +82,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato } // Build multiline - splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil) + splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil, MaxUDPSize) if err != nil { return nil, err } diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index af822d1a..13785c44 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -116,20 +116,24 @@ type MultilineConfig struct { } // Build will build a Multiline operator. -func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) { - return c.getSplitFunc(encoding, flushAtEOF, force) +func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool, force *Flusher, maxLogSize int) (bufio.SplitFunc, error) { + return c.getSplitFunc(encoding, flushAtEOF, force, maxLogSize) } // getSplitFunc returns split function for bufio.Scanner basing on configured pattern -func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) { +func (c MultilineConfig) getSplitFunc(encodingVar encoding.Encoding, flushAtEOF bool, force *Flusher, maxLogSize int) (bufio.SplitFunc, error) { endPattern := c.LineEndPattern startPattern := c.LineStartPattern switch { case endPattern != "" && startPattern != "": return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") + case encodingVar == encoding.Nop && (endPattern != "" || startPattern != ""): + return nil, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding") + case encodingVar == encoding.Nop: + return SplitNone(maxLogSize), nil case endPattern == "" && startPattern == "": - return NewNewlineSplitFunc(encoding, flushAtEOF, force) + return NewNewlineSplitFunc(encodingVar, flushAtEOF, force) case endPattern != "": re, err := regexp.Compile("(?m)" + c.LineEndPattern) if err != nil { @@ -204,6 +208,24 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) b } } +// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop +func SplitNone(maxLogSize int) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if len(data) >= maxLogSize { + return maxLogSize, data[:maxLogSize], nil + } + + if !atEOF { + return 0, nil, nil + } + + if len(data) == 0 { + return 0, nil, nil + } + return len(data), data, nil + } +} + // NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc { @@ -313,9 +335,9 @@ func NewSplitterConfig() SplitterConfig { } // Build builds Splitter struct -func (c *SplitterConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Splitter, error) { +func (c *SplitterConfig) Build(encoding encoding.Encoding, flushAtEOF bool, maxLogSize int) (*Splitter, error) { flusher := c.Flusher.Build() - splitFunc, err := c.Multiline.Build(encoding, flushAtEOF, flusher) + splitFunc, err := c.Multiline.Build(encoding, flushAtEOF, flusher, maxLogSize) if err != nil { return nil, err diff --git a/operator/helper/multiline_test.go b/operator/helper/multiline_test.go index 4bc7b6fc..1952ffec 100644 --- a/operator/helper/multiline_test.go +++ b/operator/helper/multiline_test.go @@ -18,6 +18,7 @@ import ( "bufio" "bytes" "errors" + "fmt" "regexp" "testing" "time" @@ -174,7 +175,7 @@ func TestLineStartSplitFunc(t *testing.T) { LineStartPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.Flusher) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.Flusher, 0) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -314,7 +315,7 @@ func TestLineEndSplitFunc(t *testing.T) { LineEndPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.Flusher) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.Flusher, 0) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -420,6 +421,114 @@ func TestNewlineSplitFunc(t *testing.T) { } } +type noSplitTestCase struct { + Name string + Raw []byte + ExpectedTokenized [][]byte +} + +func (tc noSplitTestCase) RunFunc(splitFunc bufio.SplitFunc) func(t *testing.T) { + return func(t *testing.T) { + scanner := bufio.NewScanner(bytes.NewReader(tc.Raw)) + scanner.Split(splitFunc) + tokenized := make([][]byte, 0) + for { + ok := scanner.Scan() + if !ok { + break + } + tokenized = append(tokenized, scanner.Bytes()) + } + + assert.Equal(t, tc.ExpectedTokenized, tokenized) + } +} + +func TestNoSplitFunc(t *testing.T) { + const largeLogSize = 100 + testCases := []noSplitTestCase{ + { + Name: "OneLogSimple", + Raw: []byte("my log\n"), + ExpectedTokenized: [][]byte{ + []byte("my log\n"), + }, + }, + { + Name: "TwoLogsSimple", + Raw: []byte("log1\nlog2\n"), + ExpectedTokenized: [][]byte{ + []byte("log1\nlog2\n"), + }, + }, + { + Name: "TwoLogsCarriageReturn", + Raw: []byte("log1\r\nlog2\r\n"), + ExpectedTokenized: [][]byte{ + []byte("log1\r\nlog2\r\n"), + }, + }, + { + Name: "NoTailingNewline", + Raw: []byte(`foo`), + ExpectedTokenized: [][]byte{[]byte("foo")}, + }, + { + Name: "HugeLog100", + Raw: func() []byte { + return generatedByteSliceOfLength(largeLogSize) + }(), + ExpectedTokenized: [][]byte{ + generatedByteSliceOfLength(100), + }, + }, + { + Name: "HugeLog300", + Raw: func() []byte { + return generatedByteSliceOfLength(largeLogSize * 3) + }(), + ExpectedTokenized: [][]byte{ + []byte("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuv"), + []byte("wxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqr"), + []byte("stuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmn"), + }, + }, + { + Name: "EOFBeforeMaxLogSize", + Raw: func() []byte { + return generatedByteSliceOfLength(largeLogSize * 3.5) + }(), + ExpectedTokenized: [][]byte{ + []byte("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuv"), + []byte("wxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqr"), + []byte("stuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmn"), + []byte("opqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijkl"), + }, + }, + } + + for _, tc := range testCases { + splitFunc := SplitNone(largeLogSize) + t.Run(tc.Name, tc.RunFunc(splitFunc)) + } +} + +func TestNoopEncodingError(t *testing.T) { + cfg := &MultilineConfig{ + LineEndPattern: "\n", + } + + _, err := cfg.getSplitFunc(encoding.Nop, false, nil, 0) + require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) + + cfg = &MultilineConfig{ + LineStartPattern: "\n", + } + + _, err = cfg.getSplitFunc(encoding.Nop, false, nil, 0) + require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) +} + func TestNewlineSplitFunc_Encodings(t *testing.T) { cases := []struct { name string