Skip to content

Commit

Permalink
Add support for alternative encodings
Browse files Browse the repository at this point in the history
This commit adds support for all encodings supported by the
`x/text/encoding` package. This can be configured with the new
`encoding` parameter of the file input.

A limitation of the current implementation is that it does not respect
the BOM in UTF16, so users will have to explicitly choose `utf-16le` or
`utf16-be`
  • Loading branch information
camdencheek committed Jul 20, 2020
1 parent 89dbd2e commit 6b0fccc
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 22 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
- Add support for multiple encodings in the file input plugin

## [0.9.2] - 2020-07-13
### Added
- Link `carbon` into `/usr/local/bin` so it's available on most users' `PATH` ([PR28](https://github.com/observIQ/carbon/pull/28))
Expand Down
15 changes: 15 additions & 0 deletions docs/plugins/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The `file_input` plugin reads logs from files. It will place the lines read into
| `file_path_field` | | A [field](/docs/types/field.md) that will be set to the path of the file the entry was read from |
| `file_name_field` | | A [field](/docs/types/field.md) that will be set to the name of the file the entry was read from |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options |
| `max_log_size` | 1048576 | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory. |

Note that by default, no logs will be read unless the monitored file is actively being written to because `start_at` defaults to `end`.
Expand All @@ -27,6 +28,20 @@ If set, the `multiline` configuration block instructs the `file_input` plugin to
The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that
match either the beginning of a new log entry, or the end of a log entry.

### Supported encodings

| Key | Description
| --- | --- |
| `nop` | No encoding validation. Treats the file as a stream of raw bytes |
| `utf-8` | UTF-8 encoding |
| `utf-16le` | UTF-16 encoding with little-endian byte order |
| `utf-16be` | UTF-16 encoding with little-endian byte order |
| `ascii` | ASCII encoding |
| `big5` | The Big5 Chinese character encoding |

Other less common encodings are supported on a best-effort basis. See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml) for other encodings available.


### Example Configurations

#### Simple file input
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
golang.org/x/net v0.0.0-20200301022130-244492dfa37a // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sys v0.0.0-20200513112337-417ce2331b5c // indirect
golang.org/x/text v0.3.2
golang.org/x/tools v0.0.0-20200513201620-d5fe73897c97 // indirect
gonum.org/v1/gonum v0.6.2
google.golang.org/api v0.20.0
Expand Down
51 changes: 46 additions & 5 deletions plugin/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import (
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/observiq/carbon/entry"
"github.com/observiq/carbon/plugin"
"github.com/observiq/carbon/plugin/helper"
"go.uber.org/zap"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/ianaindex"
"golang.org/x/text/encoding/unicode"
)

func init() {
Expand All @@ -33,10 +37,11 @@ type InputConfig struct {

PollInterval *plugin.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
FilePathField *entry.Field `json:"file_path_field,omitempty" yaml:"file_path_field,omitempty"`
FilePathField *entry.Field `json:"file_path_field,omitempty" yaml:"file_path_field,omitempty"`
FileNameField *entry.Field `json:"file_name_field,omitempty" yaml:"file_name_field,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"`
}

// MultilineConfig is the configuration a multiline operation
Expand Down Expand Up @@ -72,7 +77,12 @@ func (c InputConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) {
}
}

splitFunc, err := c.getSplitFunc()
encoding, err := lookupEncoding(c.Encoding)
if err != nil {
return nil, err
}

splitFunc, err := c.getSplitFunc(encoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -107,6 +117,7 @@ func (c InputConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) {
fileUpdateChan: make(chan fileUpdateMessage, 10),
fingerprintBytes: 1000,
startAtBeginning: startAtBeginning,
encoding: encoding,
}

if c.MaxLogSize == 0 {
Expand All @@ -118,11 +129,39 @@ func (c InputConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) {
return plugin, nil
}

var encodingOverrides = map[string]encoding.Encoding{
"utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf8": unicode.UTF8,
"ascii": unicode.UTF8,
"us-ascii": unicode.UTF8,
"nop": encoding.Nop,
"": encoding.Nop,
}

func lookupEncoding(enc string) (encoding.Encoding, error) {
if encoding, ok := encodingOverrides[strings.ToLower(enc)]; ok {
return encoding, nil
}
encoding, err := ianaindex.IANA.Encoding(enc)
if err != nil {
return nil, fmt.Errorf("unsupported encoding '%s'", enc)
}
if encoding == nil {
return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc)
}
return encoding, nil
}

// getSplitFunc will return the split function associated the configured mode.
func (c InputConfig) getSplitFunc() (bufio.SplitFunc, error) {
func (c InputConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) {
var splitFunc bufio.SplitFunc
if c.Multiline == nil {
splitFunc = NewNewlineSplitFunc()
var err error
splitFunc, err = NewNewlineSplitFunc(encoding)
if err != nil {
return nil, err
}
} else {
definedLineEndPattern := c.Multiline.LineEndPattern != ""
definedLineStartPattern := c.Multiline.LineStartPattern != ""
Expand Down Expand Up @@ -168,6 +207,8 @@ type InputPlugin struct {
fileUpdateChan chan fileUpdateMessage
fingerprintBytes int64

encoding encoding.Encoding

wg *sync.WaitGroup
readerWg *sync.WaitGroup
cancel context.CancelFunc
Expand Down Expand Up @@ -273,7 +314,7 @@ func (f *InputPlugin) checkFile(ctx context.Context, path string, firstCheck boo
go func(ctx context.Context, path string, offset, lastSeenSize int64) {
defer f.readerWg.Done()
messenger := f.newFileUpdateMessenger(path)
err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.FilePathField, f.FileNameField, f.InputPlugin, f.MaxLogSize)
err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.FilePathField, f.FileNameField, f.InputPlugin, f.MaxLogSize, f.encoding)
if err != nil {
f.Warnw("Failed to read log file", zap.Error(err))
}
Expand Down
90 changes: 90 additions & 0 deletions plugin/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/text/encoding/unicode"
)

func newTestFileSource(t *testing.T) (*InputPlugin, chan *entry.Entry) {
Expand All @@ -46,6 +47,7 @@ func newTestFileSource(t *testing.T) (*InputPlugin, chan *entry.Entry) {
},
SplitFunc: bufio.ScanLines,
PollInterval: 50 * time.Millisecond,
encoding: unicode.UTF8,
persist: helper.NewScopedDBPersister(db, "testfile"),
runningFiles: make(map[string]struct{}),
knownFiles: make(map[string]*knownFileInfo),
Expand Down Expand Up @@ -746,3 +748,91 @@ func expectNoMessages(t *testing.T, c chan *entry.Entry) {
case <-time.After(200 * time.Millisecond):
}
}

func TestEncodings(t *testing.T) {
t.Parallel()
cases := []struct {
name string
contents []byte
encoding string
expected [][]byte
}{
{
"Nop",
[]byte{0xc5, '\n'},
"",
[][]byte{{0xc5}},
},
{
"InvalidUTFReplacement",
[]byte{0xc5, '\n'},
"utf8",
[][]byte{{0xef, 0xbf, 0xbd}},
},
{
"ValidUTF8",
[]byte("foo\n"),
"utf8",
[][]byte{[]byte("foo")},
},
{
"ChineseCharacter",
[]byte{230, 138, 152, '\n'}, // 折\n
"utf8",
[][]byte{{230, 138, 152}},
},
{
"SmileyFaceUTF16",
[]byte{216, 61, 222, 0, 0, 10}, // 😀\n
"utf-16be",
[][]byte{{240, 159, 152, 128}},
},
{
"SmileyFaceNewlineUTF16",
[]byte{216, 61, 222, 0, 0, 10, 0, 102, 0, 111, 0, 111}, // 😀\nfoo
"utf-16be",
[][]byte{{240, 159, 152, 128}, {102, 111, 111}},
},
{
"SmileyFaceNewlineUTF16LE",
[]byte{61, 216, 0, 222, 10, 0, 102, 0, 111, 0, 111, 0}, // 😀\nfoo
"utf-16le",
[][]byte{{240, 159, 152, 128}, {102, 111, 111}},
},
{
"ChineseCharacterBig5",
[]byte{167, 233, 10}, // 折\n
"big5",
[][]byte{{230, 138, 152}},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tempDir := testutil.NewTempDir(t)
path := filepath.Join(tempDir, "in.log")
err := ioutil.WriteFile(path, tc.contents, 0777)
require.NoError(t, err)

source, receivedEntries := newTestFileSource(t)
source.Include = []string{path}
source.encoding, err = lookupEncoding(tc.encoding)
require.NoError(t, err)
source.SplitFunc, err = NewNewlineSplitFunc(source.encoding)
require.NoError(t, err)
require.NotNil(t, source.encoding)

err = source.Start()
require.NoError(t, err)

for _, expected := range tc.expected {
select {
case entry := <-receivedEntries:
require.Equal(t, expected, []byte(entry.Record.(string)))
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for entry to be read")
}
}
})
}
}
35 changes: 25 additions & 10 deletions plugin/builtin/input/file/line_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bufio"
"bytes"
"regexp"

"golang.org/x/text/encoding"
)

// NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
Expand Down Expand Up @@ -67,27 +69,40 @@ func NewLineEndSplitFunc(re *regexp.Regexp) bufio.SplitFunc {

// NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but
// never returning an token using EOF as a terminator
func NewNewlineSplitFunc() bufio.SplitFunc {
func NewNewlineSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) {
newline, err := encodedNewline(encoding)
if err != nil {
return nil, err
}

carriageReturn, err := encodedCarriageReturn(encoding)
if err != nil {
return nil, err
}

return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}

if i := bytes.IndexByte(data, '\n'); i >= 0 {
if i := bytes.Index(data, newline); i >= 0 {
// We have a full newline-terminated line.
return i + 1, dropCR(data[0:i]), nil
return i + len(newline), bytes.TrimSuffix(data[:i], carriageReturn), nil
}

// Request more data.
return 0, nil, nil
}
}, nil
}

// dropCR drops a terminal \r from the data.
func dropCR(data []byte) []byte {
if len(data) > 0 && data[len(data)-1] == '\r' {
return data[0 : len(data)-1]
}
func encodedNewline(encoding encoding.Encoding) ([]byte, error) {
out := make([]byte, 10)
nDst, _, err := encoding.NewEncoder().Transform(out, []byte{'\n'}, true)
return out[:nDst], err
}

return data
func encodedCarriageReturn(encoding encoding.Encoding) ([]byte, error) {
out := make([]byte, 10)
nDst, _, err := encoding.NewEncoder().Transform(out, []byte{'\r'}, true)
return out[:nDst], err
}
Loading

0 comments on commit 6b0fccc

Please sign in to comment.