Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Add possibility to force flush logs after certain period of time #216

Merged
merged 15 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The `file_input` operator reads logs from files. It will place the lines read in
| `exclude` | [] | A list of file glob patterns to exclude from reading |
| `poll_interval` | 200ms | The duration between filesystem polls |
| `multiline` | | A `multiline` configuration block. See below for details |
| `force_flush_period` | `0s` | Time since last read of data from file, after which currently buffered log should be send to pipeline. Takes [duration](../types/duration.md) as value. Zero means waiting for new data forever |
| `write_to` | `$body` | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `include_file_name` | `true` | Whether to add the file name as the attribute `file.name` |
Expand All @@ -37,7 +38,11 @@ If set, the `multiline` configuration block instructs the `file_input` operator
The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that
match either the beginning of a new log entry, or the end of a log entry.

Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.
If using multiline, last log can sometimes be not flushed due to waiting for more content.
In order to forcefully flush last buffered log after certain period of time,
use `force_flush_period` option.

Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.

### File rotation

Expand Down
28 changes: 15 additions & 13 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func NewInputConfig(operatorID string) *InputConfig {
IncludeFilePath: false,
IncludeFileNameResolved: false,
IncludeFilePathResolved: false,
Splitter: helper.NewSplitterConfig(),
StartAt: "end",
FingerprintSize: defaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
Expand All @@ -58,17 +59,17 @@ type InputConfig struct {
Include []string `mapstructure:"include,omitempty" json:"include,omitempty" yaml:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty" json:"exclude,omitempty" yaml:"exclude,omitempty"`

PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"`
IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"`
IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
Splitter helper.SplitterConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
}

// Build will build a file input operator from the supplied configuration
Expand Down Expand Up @@ -117,7 +118,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false)
// Ensure that multiline is buildable
_, err = c.Splitter.Build(encoding.Encoding, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,13 +158,13 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
InputOperator: inputOperator,
Include: c.Include,
Exclude: c.Exclude,
SplitFunc: splitFunc,
PollInterval: c.PollInterval.Raw(),
FilePathField: filePathField,
FileNameField: fileNameField,
FilePathResolvedField: filePathResolvedField,
FileNameResolvedField: fileNameResolvedField,
startAtBeginning: startAtBeginning,
Splitter: c.Splitter,
queuedMatches: make([]string, 0),
encoding: encoding,
firstCheck: true,
Expand Down
56 changes: 32 additions & 24 deletions operator/builtin/input/file/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,9 @@ func TestUnmarshal(t *testing.T) {
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
newMulti := helper.MultilineConfig{}
newMulti.LineStartPattern = "Start"
cfg.Multiline = newMulti
newSplit := helper.NewSplitterConfig()
newSplit.Multiline.LineStartPattern = "Start"
cfg.Splitter = newSplit
return cfg
}(),
},
Expand All @@ -400,9 +400,9 @@ func TestUnmarshal(t *testing.T) {
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
newMulti := helper.MultilineConfig{}
newMulti.LineStartPattern = "%"
cfg.Multiline = newMulti
newSplit := helper.NewSplitterConfig()
newSplit.Multiline.LineStartPattern = "%"
cfg.Splitter = newSplit
return cfg
}(),
},
Expand All @@ -411,9 +411,9 @@ func TestUnmarshal(t *testing.T) {
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
newMulti := helper.MultilineConfig{}
newMulti.LineEndPattern = "Start"
cfg.Multiline = newMulti
newSplit := helper.NewSplitterConfig()
newSplit.Multiline.LineEndPattern = "Start"
cfg.Splitter = newSplit
return cfg
}(),
},
Expand All @@ -422,9 +422,9 @@ func TestUnmarshal(t *testing.T) {
ExpectErr: false,
Expect: func() *InputConfig {
cfg := defaultCfg()
newMulti := helper.MultilineConfig{}
newMulti.LineEndPattern = "%"
cfg.Multiline = newMulti
newSplit := helper.NewSplitterConfig()
newSplit.Multiline.LineEndPattern = "%"
cfg.Splitter = newSplit
return cfg
}(),
},
Expand Down Expand Up @@ -564,7 +564,8 @@ func TestBuild(t *testing.T) {
{
"MultilineConfiguredStartAndEndPatterns",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineEndPattern: "Exists",
LineStartPattern: "Exists",
}
Expand All @@ -575,7 +576,8 @@ func TestBuild(t *testing.T) {
{
"MultilineConfiguredStartPattern",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineStartPattern: "START.*",
}
},
Expand All @@ -585,7 +587,8 @@ func TestBuild(t *testing.T) {
{
"MultilineConfiguredEndPattern",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineEndPattern: "END.*",
}
},
Expand All @@ -603,7 +606,8 @@ func TestBuild(t *testing.T) {
{
"LineStartAndEnd",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineStartPattern: ".*",
LineEndPattern: ".*",
}
Expand All @@ -614,15 +618,17 @@ func TestBuild(t *testing.T) {
{
"NoLineStartOrEnd",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{}
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{}
},
require.NoError,
func(t *testing.T, f *InputOperator) {},
},
{
"InvalidLineStartRegex",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineStartPattern: "(",
}
},
Expand All @@ -632,7 +638,8 @@ func TestBuild(t *testing.T) {
{
"InvalidLineEndRegex",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineEndPattern: "(",
}
},
Expand Down Expand Up @@ -673,7 +680,8 @@ func NewTestInputConfig() *InputConfig {
cfg.WriteTo = entry.NewBodyField([]string{}...)
cfg.Include = []string{"i1", "i2"}
cfg.Exclude = []string{"e1", "e2"}
cfg.Multiline = helper.MultilineConfig{
cfg.Splitter = helper.NewSplitterConfig()
cfg.Splitter.Multiline = helper.MultilineConfig{
LineStartPattern: "start",
LineEndPattern: "end",
}
Expand All @@ -695,8 +703,8 @@ func TestMapStructureDecodeConfigWithHook(t *testing.T) {
"exclude": expect.Exclude,
"poll_interval": 0.2,
"multiline": map[string]interface{}{
"line_start_pattern": expect.Multiline.LineStartPattern,
"line_end_pattern": expect.Multiline.LineEndPattern,
"line_start_pattern": expect.Splitter.Multiline.LineStartPattern,
"line_end_pattern": expect.Splitter.Multiline.LineEndPattern,
},
"include_file_name": true,
"include_file_path": false,
Expand Down Expand Up @@ -731,8 +739,8 @@ func TestMapStructureDecodeConfig(t *testing.T) {
"Duration": 200 * 1000 * 1000,
},
"multiline": map[string]interface{}{
"line_start_pattern": expect.Multiline.LineStartPattern,
"line_end_pattern": expect.Multiline.LineEndPattern,
"line_start_pattern": expect.Splitter.Multiline.LineStartPattern,
"line_end_pattern": expect.Splitter.Multiline.LineEndPattern,
},
"include_file_name": true,
"include_file_path": false,
Expand Down
20 changes: 16 additions & 4 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package file

import (
"bufio"
"bytes"
"context"
"encoding/json"
Expand Down Expand Up @@ -44,7 +43,7 @@ type InputOperator struct {
FilePathResolvedField entry.Field
FileNameResolvedField entry.Field
PollInterval time.Duration
SplitFunc bufio.SplitFunc
Splitter helper.SplitterConfig
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}
Expand Down Expand Up @@ -323,7 +322,11 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo
}

// If we don't match any previously known files, create a new reader from scratch
newReader, err := f.NewReader(file.Name(), file, fp)
splitter, err := f.getMultiline()
if err != nil {
return nil, err
}
newReader, err := f.NewReader(file.Name(), file, fp, splitter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -393,7 +396,11 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error {
// Decode each of the known files
f.knownFiles = make([]*Reader, 0, knownFileCount)
for i := 0; i < knownFileCount; i++ {
newReader, err := f.NewReader("", nil, nil)
splitter, err := f.getMultiline()
if err != nil {
return err
}
newReader, err := f.NewReader("", nil, nil, splitter)
if err != nil {
return err
}
Expand All @@ -405,3 +412,8 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error {

return nil
}

// getMultiline returns helper.Splitter structure and error eventually
func (f *InputOperator) getMultiline() (*helper.Splitter, error) {
return f.Splitter.Build(f.encoding.Encoding, false)
}
17 changes: 13 additions & 4 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,10 @@ func TestStartAtEndNewFile(t *testing.T) {
// even if the file doesn't end in a newline
func TestNoNewline(t *testing.T) {
t.Parallel()
t.Skip()
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
cfg.Splitter = helper.NewSplitterConfig()
cfg.Splitter.Flusher.Period.Duration = time.Nanosecond
}, nil)

temp := openTemp(t, tempDir)
writeString(t, temp, "testlog1\ntestlog2")
Expand Down Expand Up @@ -625,7 +627,11 @@ func TestFileReader_FingerprintUpdated(t *testing.T) {
tempCopy := openFile(t, temp.Name())
fp, err := operator.NewFingerprint(temp)
require.NoError(t, err)
reader, err := operator.NewReader(temp.Name(), tempCopy, fp)

splitter, err := operator.getMultiline()
require.NoError(t, err)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitter)
require.NoError(t, err)
defer reader.Close()

Expand Down Expand Up @@ -666,7 +672,10 @@ func TestFingerprintGrowsAndStops(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte(""), fp.FirstBytes)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp)
splitter, err := operator.getMultiline()
require.NoError(t, err)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitter)
require.NoError(t, err)
defer reader.Close()

Expand Down
15 changes: 12 additions & 3 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"golang.org/x/text/transform"

"github.com/open-telemetry/opentelemetry-log-collection/errors"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

// File attributes contains information about file paths
Expand Down Expand Up @@ -70,11 +71,13 @@ type Reader struct {
decoder *encoding.Decoder
decodeBuffer []byte

splitter *helper.Splitter

*zap.SugaredLogger `json:"-"`
}

// NewReader creates a new file reader
func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (*Reader, error) {
func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, splitter *helper.Splitter) (*Reader, error) {
r := &Reader{
Fingerprint: fp,
file: file,
Expand All @@ -83,13 +86,14 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (
decoder: f.encoding.Encoding.NewDecoder(),
decodeBuffer: make([]byte, 1<<12),
fileAttributes: f.resolveFileAttributes(path),
splitter: splitter,
}
return r, nil
}

// Copy creates a deep copy of a Reader
func (r *Reader) Copy(file *os.File) (*Reader, error) {
reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy())
reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.splitter)
if err != nil {
return nil, err
}
Expand All @@ -116,7 +120,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
return
}

scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.fileInput.SplitFunc)
scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.splitter.SplitFunc)

// Iterate over the tokenized file, emitting entries as we go
for {
Expand All @@ -131,8 +135,13 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
if err := getScannerError(scanner); err != nil {
r.Errorw("Failed during scan", zap.Error(err))
}

// Force flush eventually in next iteration
r.splitter.CheckAndFlush()
break
}
// Update information about last flush time
r.splitter.Flushed()

if err := r.emit(ctx, scanner.Bytes()); err != nil {
r.Error("Failed to emit entry", zap.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true)
// Build multiline
splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil)
if err != nil {
return nil, err
}
Expand Down
Loading