Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Add possibility to force flush logs after certain period of time #216

Merged
merged 15 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ If set, the `multiline` configuration block instructs the `file_input` operator
The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that
match either the beginning of a new log entry, or the end of a log entry.

Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.
If using multiline, last log can sometimes be not flushed due to waiting for more content.
In order to forcefully flush last buffered log after certain period of time,
set `force_flush_period` option to [duration string](https://golang.org/pkg/time/#ParseDuration),
eg: `5s`, `1m`. It's by default `0s` which means, that no force flushing will be performed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, I think we should use helper.Duration instead of time.Duration. We can then link to https://github.com/open-telemetry/opentelemetry-log-collection/blob/main/docs/types/duration.md, which explains formatting sufficiently.

This is basically the same thing, but with a default unit of seconds, since nanoseconds is basically never reasonable in this context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated documentation


Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.

### File rotation

Expand Down
5 changes: 3 additions & 2 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false)
// Ensure that multiline is buildable
_, err = c.Multiline.Build(encoding.Encoding, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,7 +157,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
InputOperator: inputOperator,
Include: c.Include,
Exclude: c.Exclude,
SplitFunc: splitFunc,
Multiline: c.Multiline,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this is just a naming issue, but I'll have a difficult time accepting that the splitfunc is being delegated to a Multiline struct. Put another way, we need a split func regardless of whether or not we are using multiline splitting.

I suspect that we can accomplish the same functionality as you are implementing, but by composing things in a better way.

Would it make sense to create a new struct which would pair the concept of a split func with the concept of a flush? Maybe something like:

type Splitter struct {
    splitFunc // can be simple like '\n' or multiline config
    flusher     // forces a flush based on timing
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at this further, I'm not convinced we even need to do what I've suggested here. Please see my related comment on reader.go. I'll leave this comment in place as a possible alternative, but I think we need to start by justifying the need to alter splitFunc behavior in any way.

PollInterval: c.PollInterval.Raw(),
FilePathField: filePathField,
FileNameField: fileNameField,
Expand Down
20 changes: 16 additions & 4 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package file

import (
"bufio"
"bytes"
"context"
"encoding/json"
Expand Down Expand Up @@ -44,7 +43,7 @@ type InputOperator struct {
FilePathResolvedField entry.Field
FileNameResolvedField entry.Field
PollInterval time.Duration
SplitFunc bufio.SplitFunc
Multiline helper.MultilineConfig
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}
Expand Down Expand Up @@ -323,7 +322,11 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo
}

// If we don't match any previously known files, create a new reader from scratch
newReader, err := f.NewReader(file.Name(), file, fp)
multiline, err := f.getMultiline()
if err != nil {
return nil, err
}
newReader, err := f.NewReader(file.Name(), file, fp, multiline)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -393,7 +396,11 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error {
// Decode each of the known files
f.knownFiles = make([]*Reader, 0, knownFileCount)
for i := 0; i < knownFileCount; i++ {
newReader, err := f.NewReader("", nil, nil)
multiline, err := f.getMultiline()
if err != nil {
return err
}
newReader, err := f.NewReader("", nil, nil, multiline)
if err != nil {
return err
}
Expand All @@ -405,3 +412,8 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error {

return nil
}

// Build multiline using struct fields
func (f *InputOperator) getMultiline() (*helper.Multiline, error) {
return f.Multiline.Build(f.encoding.Encoding, false)
}
17 changes: 13 additions & 4 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,10 @@ func TestStartAtEndNewFile(t *testing.T) {
// even if the file doesn't end in a newline
func TestNoNewline(t *testing.T) {
t.Parallel()
t.Skip()
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
cfg.Multiline = helper.NewMultilineConfig()
cfg.Multiline.ForceFlushPeriod = "1ms"
}, nil)

temp := openTemp(t, tempDir)
writeString(t, temp, "testlog1\ntestlog2")
Expand Down Expand Up @@ -625,7 +627,11 @@ func TestFileReader_FingerprintUpdated(t *testing.T) {
tempCopy := openFile(t, temp.Name())
fp, err := operator.NewFingerprint(temp)
require.NoError(t, err)
reader, err := operator.NewReader(temp.Name(), tempCopy, fp)

multiline, err := operator.getMultiline()
require.NoError(t, err)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp, multiline)
require.NoError(t, err)
defer reader.Close()

Expand Down Expand Up @@ -666,7 +672,10 @@ func TestFingerprintGrowsAndStops(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte(""), fp.FirstBytes)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp)
multiline, err := operator.getMultiline()
require.NoError(t, err)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp, multiline)
require.NoError(t, err)
defer reader.Close()

Expand Down
15 changes: 12 additions & 3 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"golang.org/x/text/transform"

"github.com/open-telemetry/opentelemetry-log-collection/errors"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

// File attributes contains information about file paths
Expand Down Expand Up @@ -70,11 +71,13 @@ type Reader struct {
decoder *encoding.Decoder
decodeBuffer []byte

multiline *helper.Multiline

*zap.SugaredLogger `json:"-"`
}

// NewReader creates a new file reader
func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (*Reader, error) {
func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, multiline *helper.Multiline) (*Reader, error) {
r := &Reader{
Fingerprint: fp,
file: file,
Expand All @@ -83,13 +86,14 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (
decoder: f.encoding.Encoding.NewDecoder(),
decodeBuffer: make([]byte, 1<<12),
fileAttributes: f.resolveFileAttributes(path),
multiline: multiline,
}
return r, nil
}

// Copy creates a deep copy of a Reader
func (r *Reader) Copy(file *os.File) (*Reader, error) {
reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy())
reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.multiline)
if err != nil {
return nil, err
}
Expand All @@ -116,7 +120,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
return
}

scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.fileInput.SplitFunc)
scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.multiline.SplitFunc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is the only place we are using the split func, so I'm not sure why it needs to be part of the multiline config.

We have two concepts here:

  1. How to split lines
  2. When to force a flush

I'm not convinced these are dependent on each other, or that either is necessarily a concern of multiline behavior. (Of course a split func may implement multiline splitting, but it does not always.)

Can we leave splitFunc as it was and just add a new config that controls flushing?

Copy link
Member Author

@sumo-drosiek sumo-drosiek Jul 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some relations I was unsure how to resolve, so I put both functionality into one struct:

  • splitFunc needs to be aware when to force a flush (so new structure is created and pass to the splitFunc)
  • every file should be flushed separately (so splitFunc is created for every unique reader)

Regarding line splitting. Of course it is not always a multiline behavior, but looking at it from user perspective it is intuitive to use multline config for it, so Multiline struct underneath, than mixing Splitter struct with multiline config, or using splitter config.

I will exclude force a flush to separate config, as logic of it is mostly out of the splitFunc, but not sure how to resolve other dependencies in nice way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some changes in requested direction, but after all I think Splitter struct seems to be reasonable consensus.

You can see current implementation without Splitter. There are two dependent entities: splitFunc and ForceFlush and I feel that they should be connected together via additional struct. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think consolidating into one struct makes sense

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the latest code, please


// Iterate over the tokenized file, emitting entries as we go
for {
Expand All @@ -131,8 +135,13 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
if err := getScannerError(scanner); err != nil {
r.Errorw("Failed during scan", zap.Error(err))
}

// Force flush eventually in next iteration
r.multiline.CheckAndFlush()
break
}
// Update information about last flush time
r.multiline.Flushed()

if err := r.emit(ctx, scanner.Bytes()); err != nil {
r.Error("Failed to emit entry", zap.Error(err))
Expand Down
5 changes: 3 additions & 2 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true)
// Build multiline
multiline, err := c.Multiline.Build(encoding.Encoding, true)
if err != nil {
return nil, err
}
Expand All @@ -114,7 +115,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
splitFunc: multiline.SplitFunc,
backoff: backoff.Backoff{
Max: 3 * time.Second,
},
Expand Down
5 changes: 3 additions & 2 deletions operator/builtin/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true)
// Build multiline
multiline, err := c.Multiline.Build(encoding.Encoding, true)
if err != nil {
return nil, err
}
Expand All @@ -97,7 +98,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
buffer: make([]byte, MaxUDPSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
splitFunc: multiline.SplitFunc,
resolver: resolver,
}
return []operator.Operator{udpInput}, nil
Expand Down
Loading