diff --git a/CHANGELOG.md b/CHANGELOG.md index c71039a6d..395f68c79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased +- Add support for multiple encodings in the file input plugin + ## [0.9.2] - 2020-07-13 ### Added - Link `carbon` into `/usr/local/bin` so it's available on most users' `PATH` ([PR28](https://github.com/observIQ/carbon/pull/28)) diff --git a/docs/plugins/file_input.md b/docs/plugins/file_input.md index 4ff3e03a5..85ab6f819 100644 --- a/docs/plugins/file_input.md +++ b/docs/plugins/file_input.md @@ -16,6 +16,7 @@ The `file_input` plugin reads logs from files. It will place the lines read into | `file_path_field` | | A [field](/docs/types/field.md) that will be set to the path of the file the entry was read from | | `file_name_field` | | A [field](/docs/types/field.md) that will be set to the name of the file the entry was read from | | `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` | +| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options | | `max_log_size` | 1048576 | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory. | Note that by default, no logs will be read unless the monitored file is actively being written to because `start_at` defaults to `end`. @@ -27,6 +28,20 @@ If set, the `multiline` configuration block instructs the `file_input` plugin to The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that match either the beginning of a new log entry, or the end of a log entry. +### Supported encodings + +| Key | Description +| --- | --- | +| `nop` | No encoding validation. Treats the file as a stream of raw bytes | +| `utf-8` | UTF-8 encoding | +| `utf-16le` | UTF-16 encoding with little-endian byte order | +| `utf-16be` | UTF-16 encoding with little-endian byte order | +| `ascii` | ASCII encoding | +| `big5` | The Big5 Chinese character encoding | + +Other less common encodings are supported on a best-effort basis. See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml) for other encodings available. + + ### Example Configurations #### Simple file input diff --git a/go.mod b/go.mod index 9d04907b2..13dd206bc 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( golang.org/x/net v0.0.0-20200301022130-244492dfa37a // indirect golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 golang.org/x/sys v0.0.0-20200513112337-417ce2331b5c // indirect + golang.org/x/text v0.3.2 golang.org/x/tools v0.0.0-20200513201620-d5fe73897c97 // indirect gonum.org/v1/gonum v0.6.2 google.golang.org/api v0.20.0 diff --git a/plugin/builtin/input/file/file.go b/plugin/builtin/input/file/file.go index b5848afac..d3b7cc8d3 100644 --- a/plugin/builtin/input/file/file.go +++ b/plugin/builtin/input/file/file.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "regexp" + "strings" "sync" "time" @@ -18,6 +19,9 @@ import ( "github.com/observiq/carbon/plugin" "github.com/observiq/carbon/plugin/helper" "go.uber.org/zap" + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/ianaindex" + "golang.org/x/text/encoding/unicode" ) func init() { @@ -33,10 +37,11 @@ type InputConfig struct { PollInterval *plugin.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"` - FilePathField *entry.Field `json:"file_path_field,omitempty" yaml:"file_path_field,omitempty"` + FilePathField *entry.Field `json:"file_path_field,omitempty" yaml:"file_path_field,omitempty"` FileNameField *entry.Field `json:"file_name_field,omitempty" yaml:"file_name_field,omitempty"` StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"` } // MultilineConfig is the configuration a multiline operation @@ -72,7 +77,12 @@ func (c InputConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) { } } - splitFunc, err := c.getSplitFunc() + encoding, err := lookupEncoding(c.Encoding) + if err != nil { + return nil, err + } + + splitFunc, err := c.getSplitFunc(encoding) if err != nil { return nil, err } @@ -107,6 +117,7 @@ func (c InputConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) { fileUpdateChan: make(chan fileUpdateMessage, 10), fingerprintBytes: 1000, startAtBeginning: startAtBeginning, + encoding: encoding, } if c.MaxLogSize == 0 { @@ -118,11 +129,39 @@ func (c InputConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) { return plugin, nil } +var encodingOverrides = map[string]encoding.Encoding{ + "utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), + "utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM), + "utf8": unicode.UTF8, + "ascii": unicode.UTF8, + "us-ascii": unicode.UTF8, + "nop": encoding.Nop, + "": encoding.Nop, +} + +func lookupEncoding(enc string) (encoding.Encoding, error) { + if encoding, ok := encodingOverrides[strings.ToLower(enc)]; ok { + return encoding, nil + } + encoding, err := ianaindex.IANA.Encoding(enc) + if err != nil { + return nil, fmt.Errorf("unsupported encoding '%s'", enc) + } + if encoding == nil { + return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc) + } + return encoding, nil +} + // getSplitFunc will return the split function associated the configured mode. -func (c InputConfig) getSplitFunc() (bufio.SplitFunc, error) { +func (c InputConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { var splitFunc bufio.SplitFunc if c.Multiline == nil { - splitFunc = NewNewlineSplitFunc() + var err error + splitFunc, err = NewNewlineSplitFunc(encoding) + if err != nil { + return nil, err + } } else { definedLineEndPattern := c.Multiline.LineEndPattern != "" definedLineStartPattern := c.Multiline.LineStartPattern != "" @@ -168,6 +207,8 @@ type InputPlugin struct { fileUpdateChan chan fileUpdateMessage fingerprintBytes int64 + encoding encoding.Encoding + wg *sync.WaitGroup readerWg *sync.WaitGroup cancel context.CancelFunc @@ -273,7 +314,7 @@ func (f *InputPlugin) checkFile(ctx context.Context, path string, firstCheck boo go func(ctx context.Context, path string, offset, lastSeenSize int64) { defer f.readerWg.Done() messenger := f.newFileUpdateMessenger(path) - err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.FilePathField, f.FileNameField, f.InputPlugin, f.MaxLogSize) + err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.FilePathField, f.FileNameField, f.InputPlugin, f.MaxLogSize, f.encoding) if err != nil { f.Warnw("Failed to read log file", zap.Error(err)) } diff --git a/plugin/builtin/input/file/file_test.go b/plugin/builtin/input/file/file_test.go index d9ec6fa40..d4d570fec 100644 --- a/plugin/builtin/input/file/file_test.go +++ b/plugin/builtin/input/file/file_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + "golang.org/x/text/encoding/unicode" ) func newTestFileSource(t *testing.T) (*InputPlugin, chan *entry.Entry) { @@ -46,6 +47,7 @@ func newTestFileSource(t *testing.T) (*InputPlugin, chan *entry.Entry) { }, SplitFunc: bufio.ScanLines, PollInterval: 50 * time.Millisecond, + encoding: unicode.UTF8, persist: helper.NewScopedDBPersister(db, "testfile"), runningFiles: make(map[string]struct{}), knownFiles: make(map[string]*knownFileInfo), @@ -746,3 +748,91 @@ func expectNoMessages(t *testing.T, c chan *entry.Entry) { case <-time.After(200 * time.Millisecond): } } + +func TestEncodings(t *testing.T) { + t.Parallel() + cases := []struct { + name string + contents []byte + encoding string + expected [][]byte + }{ + { + "Nop", + []byte{0xc5, '\n'}, + "", + [][]byte{{0xc5}}, + }, + { + "InvalidUTFReplacement", + []byte{0xc5, '\n'}, + "utf8", + [][]byte{{0xef, 0xbf, 0xbd}}, + }, + { + "ValidUTF8", + []byte("foo\n"), + "utf8", + [][]byte{[]byte("foo")}, + }, + { + "ChineseCharacter", + []byte{230, 138, 152, '\n'}, // 折\n + "utf8", + [][]byte{{230, 138, 152}}, + }, + { + "SmileyFaceUTF16", + []byte{216, 61, 222, 0, 0, 10}, // 😀\n + "utf-16be", + [][]byte{{240, 159, 152, 128}}, + }, + { + "SmileyFaceNewlineUTF16", + []byte{216, 61, 222, 0, 0, 10, 0, 102, 0, 111, 0, 111}, // 😀\nfoo + "utf-16be", + [][]byte{{240, 159, 152, 128}, {102, 111, 111}}, + }, + { + "SmileyFaceNewlineUTF16LE", + []byte{61, 216, 0, 222, 10, 0, 102, 0, 111, 0, 111, 0}, // 😀\nfoo + "utf-16le", + [][]byte{{240, 159, 152, 128}, {102, 111, 111}}, + }, + { + "ChineseCharacterBig5", + []byte{167, 233, 10}, // 折\n + "big5", + [][]byte{{230, 138, 152}}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tempDir := testutil.NewTempDir(t) + path := filepath.Join(tempDir, "in.log") + err := ioutil.WriteFile(path, tc.contents, 0777) + require.NoError(t, err) + + source, receivedEntries := newTestFileSource(t) + source.Include = []string{path} + source.encoding, err = lookupEncoding(tc.encoding) + require.NoError(t, err) + source.SplitFunc, err = NewNewlineSplitFunc(source.encoding) + require.NoError(t, err) + require.NotNil(t, source.encoding) + + err = source.Start() + require.NoError(t, err) + + for _, expected := range tc.expected { + select { + case entry := <-receivedEntries: + require.Equal(t, expected, []byte(entry.Record.(string))) + case <-time.After(time.Second): + require.FailNow(t, "Timed out waiting for entry to be read") + } + } + }) + } +} diff --git a/plugin/builtin/input/file/line_splitter.go b/plugin/builtin/input/file/line_splitter.go index dd72a2113..440216bc1 100644 --- a/plugin/builtin/input/file/line_splitter.go +++ b/plugin/builtin/input/file/line_splitter.go @@ -4,6 +4,8 @@ import ( "bufio" "bytes" "regexp" + + "golang.org/x/text/encoding" ) // NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into @@ -67,27 +69,40 @@ func NewLineEndSplitFunc(re *regexp.Regexp) bufio.SplitFunc { // NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewNewlineSplitFunc() bufio.SplitFunc { +func NewNewlineSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) { + newline, err := encodedNewline(encoding) + if err != nil { + return nil, err + } + + carriageReturn, err := encodedCarriageReturn(encoding) + if err != nil { + return nil, err + } + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { if atEOF && len(data) == 0 { return 0, nil, nil } - if i := bytes.IndexByte(data, '\n'); i >= 0 { + if i := bytes.Index(data, newline); i >= 0 { // We have a full newline-terminated line. - return i + 1, dropCR(data[0:i]), nil + return i + len(newline), bytes.TrimSuffix(data[:i], carriageReturn), nil } // Request more data. return 0, nil, nil - } + }, nil } -// dropCR drops a terminal \r from the data. -func dropCR(data []byte) []byte { - if len(data) > 0 && data[len(data)-1] == '\r' { - return data[0 : len(data)-1] - } +func encodedNewline(encoding encoding.Encoding) ([]byte, error) { + out := make([]byte, 10) + nDst, _, err := encoding.NewEncoder().Transform(out, []byte{'\n'}, true) + return out[:nDst], err +} - return data +func encodedCarriageReturn(encoding encoding.Encoding) ([]byte, error) { + out := make([]byte, 10) + nDst, _, err := encoding.NewEncoder().Transform(out, []byte{'\r'}, true) + return out[:nDst], err } diff --git a/plugin/builtin/input/file/line_splitter_test.go b/plugin/builtin/input/file/line_splitter_test.go index 3e4721788..5b91e8f78 100644 --- a/plugin/builtin/input/file/line_splitter_test.go +++ b/plugin/builtin/input/file/line_splitter_test.go @@ -9,6 +9,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/unicode" ) type tokenizerTestCase struct { @@ -271,11 +273,80 @@ func TestNewlineSplitFunc(t *testing.T) { } for _, tc := range testCases { - splitFunc := NewNewlineSplitFunc() + splitFunc, err := NewNewlineSplitFunc(unicode.UTF8) + require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } } +func TestNewlineSplitFunc_Encodings(t *testing.T) { + cases := []struct { + name string + encoding encoding.Encoding + input []byte + tokens [][]byte + }{ + { + "Simple", + unicode.UTF8, + []byte("testlog\n"), + [][]byte{[]byte("testlog")}, + }, + { + "CarriageReturn", + unicode.UTF8, + []byte("testlog\r\n"), + [][]byte{[]byte("testlog")}, + }, + { + "SimpleUTF16", + unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM), + []byte{0, 116, 0, 101, 0, 115, 0, 116, 0, 108, 0, 111, 0, 103, 0, 10}, // testlog\n + [][]byte{{0, 116, 0, 101, 0, 115, 0, 116, 0, 108, 0, 111, 0, 103}}, + }, + { + "MultiUTF16", + unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM), + []byte{0, 108, 0, 111, 0, 103, 0, 49, 0, 10, 0, 108, 0, 111, 0, 103, 0, 50, 0, 10}, // log1\nlog2\n + [][]byte{ + {0, 108, 0, 111, 0, 103, 0, 49}, // log1 + {0, 108, 0, 111, 0, 103, 0, 50}, // log2 + }, + }, + { + "MultiCarriageReturnUTF16", + unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM), + []byte{0, 108, 0, 111, 0, 103, 0, 49, 0, 13, 0, 10, 0, 108, 0, 111, 0, 103, 0, 50, 0, 13, 0, 10}, // log1\r\nlog2\r\n + [][]byte{ + {0, 108, 0, 111, 0, 103, 0, 49}, // log1 + {0, 108, 0, 111, 0, 103, 0, 50}, // log2 + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + splitFunc, err := NewNewlineSplitFunc(tc.encoding) + require.NoError(t, err) + scanner := bufio.NewScanner(bytes.NewReader(tc.input)) + scanner.Split(splitFunc) + + tokens := [][]byte{} + for { + ok := scanner.Scan() + if !ok { + require.NoError(t, scanner.Err()) + break + } + + tokens = append(tokens, scanner.Bytes()) + } + + require.Equal(t, tc.tokens, tokens) + }) + } +} + func generatedByteSliceOfLength(length int) []byte { chars := []byte(`abcdefghijklmnopqrstuvwxyz`) newSlice := make([]byte, length) diff --git a/plugin/builtin/input/file/read_to_end.go b/plugin/builtin/input/file/read_to_end.go index 6a86d0e2a..ca93b765b 100644 --- a/plugin/builtin/input/file/read_to_end.go +++ b/plugin/builtin/input/file/read_to_end.go @@ -10,10 +10,24 @@ import ( "github.com/observiq/carbon/entry" "github.com/observiq/carbon/errors" "github.com/observiq/carbon/plugin/helper" + "go.uber.org/zap" + "golang.org/x/text/encoding" ) // ReadToEnd will read entries from a file and send them to the outputs of an input plugin -func ReadToEnd(ctx context.Context, path string, startOffset int64, lastSeenFileSize int64, messenger fileUpdateMessenger, splitFunc bufio.SplitFunc, filePathField, fileNameField *entry.Field, inputPlugin helper.InputPlugin, maxLogSize int) error { +func ReadToEnd( + ctx context.Context, + path string, + startOffset int64, + lastSeenFileSize int64, + messenger fileUpdateMessenger, + splitFunc bufio.SplitFunc, + filePathField *entry.Field, + fileNameField *entry.Field, + inputPlugin helper.InputPlugin, + maxLogSize int, + encoding encoding.Encoding, +) error { defer messenger.FinishedReading() select { @@ -56,11 +70,16 @@ func ReadToEnd(ctx context.Context, path string, startOffset int64, lastSeenFile } scanner.Split(scanFunc) + decoder := encoding.NewDecoder() + + // Make a large, reusable buffer for transforming + decodeBuffer := make([]byte, 16384) + // If we're not at the end of the file, and we haven't // advanced since last cycle, read the rest of the file as an entry defer func() { if pos < stat.Size() && pos == startOffset && lastSeenFileSize == stat.Size() { - readRemaining(ctx, file, pos, stat.Size(), messenger, inputPlugin, filePathField, fileNameField) + readRemaining(ctx, file, pos, stat.Size(), messenger, inputPlugin, filePathField, fileNameField, decoder, decodeBuffer) } }() @@ -79,8 +98,13 @@ func ReadToEnd(ctx context.Context, path string, startOffset int64, lastSeenFile return scanner.Err() } - message := scanner.Text() - e := inputPlugin.NewEntry(message) + decoder.Reset() + nDst, _, err := decoder.Transform(decodeBuffer, scanner.Bytes(), true) + if err != nil { + return err + } + + e := inputPlugin.NewEntry(string(decodeBuffer[:nDst])) if filePathField != nil { e.Set(*filePathField, path) } @@ -93,7 +117,7 @@ func ReadToEnd(ctx context.Context, path string, startOffset int64, lastSeenFile } // readRemaining will read the remaining characters in a file as a log entry. -func readRemaining(ctx context.Context, file *os.File, filePos int64, fileSize int64, messenger fileUpdateMessenger, inputPlugin helper.InputPlugin, filePathField, fileNameField *entry.Field) { +func readRemaining(ctx context.Context, file *os.File, filePos int64, fileSize int64, messenger fileUpdateMessenger, inputPlugin helper.InputPlugin, filePathField, fileNameField *entry.Field, encoder *encoding.Decoder, decodeBuffer []byte) { _, err := file.Seek(filePos, 0) if err != nil { inputPlugin.Errorf("failed to seek to read last log entry") @@ -106,8 +130,13 @@ func readRemaining(ctx context.Context, file *os.File, filePos int64, fileSize i inputPlugin.Errorf("failed to read trailing log") return } + encoder.Reset() + nDst, _, err := encoder.Transform(decodeBuffer, msgBuf, true) + if err != nil { + inputPlugin.Errorw("failed to decode trailing log", zap.Error(err)) + } - e := inputPlugin.NewEntry(string(msgBuf[:n])) + e := inputPlugin.NewEntry(string(decodeBuffer[:nDst])) if filePathField != nil { e.Set(*filePathField, file.Name()) }