Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encoding buffer resizing #58

Merged
merged 5 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func (f *InputOperator) Start() error {
f.wg.Add(1)
go func() {
defer f.wg.Done()
defer f.syncKnownFiles()
defer f.drainMessages()

globTicker := time.NewTicker(f.PollInterval)
defer globTicker.Stop()
Expand All @@ -242,9 +244,6 @@ func (f *InputOperator) Start() error {
for {
select {
case <-ctx.Done():
f.drainMessages()
f.readerWg.Wait()
f.syncKnownFiles()
return
case <-globTicker.C:
matches := getMatches(f.Include, f.Exclude)
Expand All @@ -256,8 +255,10 @@ func (f *InputOperator) Start() error {
}
f.syncKnownFiles()
firstCheck = false
case message := <-f.fileUpdateChan:
f.updateFile(message)
case message, ok := <-f.fileUpdateChan:
if ok {
f.updateFile(message)
}
}
}
}()
Expand All @@ -269,7 +270,7 @@ func (f *InputOperator) Start() error {
func (f *InputOperator) Stop() error {
f.cancel()
f.wg.Wait()
f.syncKnownFiles()
f.fileUpdateChan = make(chan fileUpdateMessage)
f.knownFiles = nil
return nil
}
Expand Down Expand Up @@ -314,6 +315,7 @@ func (f *InputOperator) checkFile(ctx context.Context, path string, firstCheck b
go func(ctx context.Context, path string, offset, lastSeenSize int64) {
defer f.readerWg.Done()
messenger := f.newFileUpdateMessenger(path)
defer messenger.FinishedReading()
err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.FilePathField, f.FileNameField, f.InputOperator, f.MaxLogSize, f.encoding)
if err != nil {
f.Warnw("Failed to read log file", zap.Error(err))
Expand Down Expand Up @@ -386,19 +388,17 @@ func (f *InputOperator) updateFile(message fileUpdateMessage) {
}

func (f *InputOperator) drainMessages() {
done := make(chan struct{})
go func() {
f.readerWg.Wait()
close(done)
close(f.fileUpdateChan)
}()

for {
select {
case <-done:
message, ok := <-f.fileUpdateChan
if !ok {
return
case message := <-f.fileUpdateChan:
f.updateFile(message)
}
f.updateFile(message)
}
}

Expand Down
33 changes: 33 additions & 0 deletions operator/builtin/input/file/positional_scanner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package file

import (
"bufio"
"io"
)

type PositionalScanner struct {
pos int64
*bufio.Scanner
}

func NewPositionalScanner(r io.Reader, maxLogSize int, startOffset int64, splitFunc bufio.SplitFunc) *PositionalScanner {
ps := &PositionalScanner{
pos: startOffset,
Scanner: bufio.NewScanner(r),
}

buf := make([]byte, 0, 16384)
ps.Scanner.Buffer(buf, maxLogSize)

scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)
ps.pos += int64(advance)
return
}
ps.Scanner.Split(scanFunc)
return ps
}

func (ps *PositionalScanner) Pos() int64 {
return ps.pos
}
98 changes: 43 additions & 55 deletions operator/builtin/input/file/read_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/observiq/carbon/operator/helper"
"go.uber.org/zap"
"golang.org/x/text/encoding"
"golang.org/x/text/transform"
)

// ReadToEnd will read entries from a file and send them to the outputs of an input operator
Expand All @@ -28,8 +29,6 @@ func ReadToEnd(
maxLogSize int,
encoding encoding.Encoding,
) error {
defer messenger.FinishedReading()

select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -59,30 +58,34 @@ func ReadToEnd(
return fmt.Errorf("seek file: %s", err)
}

scanner := bufio.NewScanner(file)
buf := make([]byte, 0, 16384)
scanner.Buffer(buf, maxLogSize)
pos := startOffset
scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)
pos += int64(advance)
return
}
scanner.Split(scanFunc)

decoder := encoding.NewDecoder()
scanner := NewPositionalScanner(file, maxLogSize, startOffset, splitFunc)

// Make a large, reusable buffer for transforming
decoder := encoding.NewDecoder()
decodeBuffer := make([]byte, 16384)

// If we're not at the end of the file, and we haven't
// advanced since last cycle, read the rest of the file as an entry
defer func() {
if pos < stat.Size() && pos == startOffset && lastSeenFileSize == stat.Size() {
readRemaining(ctx, file, pos, stat.Size(), messenger, inputOperator, filePathField, fileNameField, decoder, decodeBuffer)
emit := func(msgBuf []byte) {
decoder.Reset()
var nDst int
for {
nDst, _, err = decoder.Transform(decodeBuffer, msgBuf, true)
if err != nil && err == transform.ErrShortDst {
decodeBuffer = make([]byte, len(decodeBuffer)*2)
continue
} else if err != nil {
inputOperator.Errorw("failed to transform encoding", zap.Error(err))
return
}
break
}
}()

e := inputOperator.NewEntry(string(decodeBuffer[:nDst]))
e.Set(filePathField, path)
e.Set(fileNameField, filepath.Base(file.Name()))
inputOperator.Write(ctx, e)
}

// Iterate over the tokenized file, emitting entries as we go
for {
select {
case <-ctx.Done():
Expand All @@ -94,47 +97,32 @@ func ReadToEnd(
if !ok {
if err := scanner.Err(); err == bufio.ErrTooLong {
return errors.NewError("log entry too large", "increase max_log_size or ensure that multiline regex patterns terminate")
} else if err != nil {
return errors.Wrap(err, "scanner error")
}
return scanner.Err()
break
}

decoder.Reset()
nDst, _, err := decoder.Transform(decodeBuffer, scanner.Bytes(), true)
if err != nil {
return err
}

e := inputOperator.NewEntry(string(decodeBuffer[:nDst]))
e.Set(filePathField, path)
e.Set(fileNameField, filepath.Base(file.Name()))
inputOperator.Write(ctx, e)
messenger.SetOffset(pos)
emit(scanner.Bytes())
messenger.SetOffset(scanner.Pos())
}
}

// readRemaining will read the remaining characters in a file as a log entry.
func readRemaining(ctx context.Context, file *os.File, filePos int64, fileSize int64, messenger fileUpdateMessenger, inputOperator helper.InputOperator, filePathField, fileNameField entry.Field, encoder *encoding.Decoder, decodeBuffer []byte) {
_, err := file.Seek(filePos, 0)
if err != nil {
inputOperator.Errorf("failed to seek to read last log entry")
return
}
// If we're not at the end of the file, and we haven't
// advanced since last cycle, read the rest of the file as an entry
if scanner.Pos() < stat.Size() && scanner.Pos() == startOffset && lastSeenFileSize == stat.Size() {
_, err := file.Seek(scanner.Pos(), 0)
if err != nil {
return errors.Wrap(err, "seeking for trailing entry")
}

msgBuf := make([]byte, fileSize-filePos)
n, err := file.Read(msgBuf)
if err != nil {
inputOperator.Errorf("failed to read trailing log")
return
}
encoder.Reset()
nDst, _, err := encoder.Transform(decodeBuffer, msgBuf, true)
if err != nil {
inputOperator.Errorw("failed to decode trailing log", zap.Error(err))
msgBuf := make([]byte, stat.Size()-scanner.Pos())
n, err := file.Read(msgBuf)
if err != nil {
return errors.Wrap(err, "reading trailing entry")
}
emit(msgBuf[:n])
messenger.SetOffset(scanner.Pos() + int64(n))
}

e := inputOperator.NewEntry(string(decodeBuffer[:nDst]))
e.Set(filePathField, file.Name())
e.Set(fileNameField, filepath.Base(file.Name()))
inputOperator.Write(ctx, e)
messenger.SetOffset(filePos + int64(n))
return nil
}