Skip to content

Commit

Permalink
[pkg/stanza] Move flush state into reader package (#27843)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Oct 20, 2023
1 parent d818cfe commit 24f1c59
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 113 deletions.
27 changes: 27 additions & 0 deletions .chloggen/pkg-stanza-metadata-flush.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate 'flush.WithPeriod'. Use 'flush.WithFunc' instead.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27843]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
14 changes: 14 additions & 0 deletions pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ func BenchmarkFileInput(b *testing.B) {
return cfg
},
},
{
name: "NoFlush",
paths: []string{
"file0.log",
},
config: func() *Config {
cfg := NewConfig()
cfg.Include = []string{
"file*.log",
}
cfg.FlushPeriod = 0
return cfg
},
},
}

for _, bench := range cases {
Expand Down
22 changes: 9 additions & 13 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -113,23 +112,18 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager,
trimFunc = c.TrimConfig.Func()
}

// Ensure that splitter is buildable
factory := splitter.NewFactory(splitFunc, trimFunc, c.FlushPeriod, int(c.MaxLogSize))
return c.buildManager(logger, emit, factory)
return c.buildManager(logger, emit, splitFunc, trimFunc)
}

// BuildWithSplitFunc will build a file input operator with customized splitFunc function
func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback, splitFunc bufio.SplitFunc) (*Manager, error) {
if err := c.validate(); err != nil {
return nil, err
}

// Ensure that splitter is buildable
factory := splitter.NewFactory(splitFunc, c.TrimConfig.Func(), c.FlushPeriod, int(c.MaxLogSize))
return c.buildManager(logger, emit, factory)
return c.buildManager(logger, emit, splitFunc, c.TrimConfig.Func())
}

func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, factory splitter.Factory) (*Manager, error) {
func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, splitFunc bufio.SplitFunc, trimFunc trim.Func) (*Manager, error) {
if emit == nil {
return nil, fmt.Errorf("must provide emit function")
}
Expand Down Expand Up @@ -175,11 +169,13 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact
IncludeFileNameResolved: c.IncludeFileNameResolved,
IncludeFilePathResolved: c.IncludeFilePathResolved,
DeleteAtEOF: c.DeleteAfterRead,
FlushTimeout: c.FlushPeriod,
},
FromBeginning: startAtBeginning,
SplitterFactory: factory,
Encoding: enc,
HeaderConfig: hCfg,
FromBeginning: startAtBeginning,
Encoding: enc,
SplitFunc: splitFunc,
TrimFunc: trimFunc,
HeaderConfig: hCfg,
},
fileMatcher: fileMatcher,
roller: newRoller(),
Expand Down
55 changes: 34 additions & 21 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"runtime"
"time"

"go.uber.org/zap"
"golang.org/x/text/encoding"
Expand All @@ -16,53 +17,65 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

type Factory struct {
*zap.SugaredLogger
Config *Config
FromBeginning bool
SplitterFactory splitter.Factory
Encoding encoding.Encoding
HeaderConfig *header.Config
Config *Config
FromBeginning bool
Encoding encoding.Encoding
HeaderConfig *header.Config
SplitFunc bufio.SplitFunc
TrimFunc trim.Func
}

func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) {
return f.build(file, &Metadata{
m := &Metadata{
Fingerprint: fp,
FileAttributes: map[string]any{},
}, f.SplitterFactory.SplitFunc())
}
if f.Config.FlushTimeout > 0 {
m.FlushState = &flush.State{LastDataChange: time.Now()}
}
return f.build(file, m)
}

// copy creates a deep copy of a reader
func (f *Factory) Copy(old *Reader, newFile *os.File) (*Reader, error) {
lineSplitFunc := old.lineSplitFunc
if lineSplitFunc == nil {
lineSplitFunc = f.SplitterFactory.SplitFunc()
}
return f.build(newFile, &Metadata{
Fingerprint: old.Fingerprint.Copy(),
Offset: old.Offset,
FileAttributes: util.MapCopy(old.FileAttributes),
HeaderFinalized: old.HeaderFinalized,
}, lineSplitFunc)
FlushState: &flush.State{
LastDataChange: old.FlushState.LastDataChange,
LastDataLength: old.FlushState.LastDataLength,
},
})
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
return fingerprint.New(file, f.Config.FingerprintSize)
}

func (f *Factory) build(file *os.File, m *Metadata, lineSplitFunc bufio.SplitFunc) (r *Reader, err error) {
func (f *Factory) build(file *os.File, m *Metadata) (r *Reader, err error) {
r = &Reader{
Config: f.Config,
Metadata: m,
file: file,
fileName: file.Name(),
logger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.Encoding),
lineSplitFunc: lineSplitFunc,
Config: f.Config,
Metadata: m,
file: file,
fileName: file.Name(),
logger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.Encoding),
}

if m.FlushState == nil {
r.lineSplitFunc = trim.WithFunc(trim.ToLength(f.SplitFunc, f.Config.MaxLogSize), f.TrimFunc)
} else {
flushFunc := m.FlushState.Func(f.SplitFunc, f.Config.FlushTimeout)
r.lineSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.Config.MaxLogSize), f.TrimFunc)
}

if !f.FromBeginning {
Expand Down
4 changes: 4 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"os"
"time"

"go.uber.org/zap"

Expand All @@ -18,6 +19,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
)

type Config struct {
Expand All @@ -29,13 +31,15 @@ type Config struct {
IncludeFileNameResolved bool
IncludeFilePathResolved bool
DeleteAtEOF bool
FlushTimeout time.Duration
}

type Metadata struct {
Fingerprint *fingerprint.Fingerprint
Offset int64
FileAttributes map[string]any
HeaderFinalized bool
FlushState *flush.State
}

// Reader manages a single file
Expand Down
46 changes: 11 additions & 35 deletions pkg/stanza/fileconsumer/internal/splitter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,19 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"bufio"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

type Factory interface {
SplitFunc() bufio.SplitFunc
}

type factory struct {
splitFunc bufio.SplitFunc
trimFunc trim.Func
flushPeriod time.Duration
maxLength int
}

var _ Factory = (*factory)(nil)

func NewFactory(splitFunc bufio.SplitFunc, trimFunc trim.Func, flushPeriod time.Duration, maxLength int) Factory {
return &factory{
splitFunc: splitFunc,
trimFunc: trimFunc,
flushPeriod: flushPeriod,
maxLength: maxLength,
}
}

// SplitFunc builds a bufio.SplitFunc based on the configuration
func (f *factory) SplitFunc() bufio.SplitFunc {
// First apply the base splitFunc.
// If no token is found, we may still flush one based on timing.
// If a token is emitted for any reason, we must then apply trim rules.
// We must trim to max length _before_ trimming whitespace because otherwise we
// cannot properly keep track of the number of bytes to advance.
// For instance, if we have advance: 5, token: []byte(" foo "):
// Trimming whitespace first would result in advance: 5, token: []byte("foo")
// Then if we trim to max length of 2, we don't know whether or not to reduce advance.
return trim.WithFunc(trim.ToLength(flush.WithPeriod(f.splitFunc, f.flushPeriod), f.maxLength), f.trimFunc)
// Func builds a bufio.SplitFunc based on the configuration
// First apply the base splitFunc.
// If a token is emitted for any reason, we must then apply trim rules.
// We must trim to max length _before_ trimming whitespace because otherwise we
// cannot properly keep track of the number of bytes to advance.
// For instance, if we have advance: 5, token: []byte(" foo "):
//
// Trimming whitespace first would result in advance: 5, token: []byte("foo")
// Then if we trim to max length of 2, we don't know whether or not to reduce advance.
func Func(splitFunc bufio.SplitFunc, maxLength int, trimFunc trim.Func) bufio.SplitFunc {
return trim.WithFunc(trim.ToLength(splitFunc, maxLength), trimFunc)
}
11 changes: 9 additions & 2 deletions pkg/stanza/fileconsumer/internal/splitter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split/splittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)
Expand Down Expand Up @@ -134,7 +135,13 @@ func TestFactorySplitFunc(t *testing.T) {
}

for _, tc := range testCases {
factory := NewFactory(tc.baseFunc, tc.trimFunc, tc.flushPeriod, tc.maxLength)
t.Run(tc.name, splittest.New(factory.SplitFunc(), tc.input, tc.steps...))
var splitFunc bufio.SplitFunc
if tc.flushPeriod > 0 {
s := &flush.State{LastDataChange: time.Now()}
splitFunc = Func(s.Func(tc.baseFunc, tc.flushPeriod), tc.maxLength, tc.trimFunc)
} else {
splitFunc = Func(tc.baseFunc, tc.maxLength, tc.trimFunc)
}
t.Run(tc.name, splittest.New(splitFunc, tc.input, tc.steps...))
}
}
9 changes: 5 additions & 4 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split"
Expand Down Expand Up @@ -237,10 +236,12 @@ func testReaderFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPer
FingerprintSize: fingerprint.DefaultSize,
MaxLogSize: maxLogSize,
Emit: testEmitFunc(emitChan),
FlushTimeout: flushPeriod,
},
FromBeginning: true,
SplitterFactory: splitter.NewFactory(splitFunc, trim.Whitespace, flushPeriod, maxLogSize),
Encoding: enc,
FromBeginning: true,
Encoding: enc,
SplitFunc: splitFunc,
TrimFunc: trim.Whitespace,
}, emitChan
}

Expand Down
Loading

0 comments on commit 24f1c59

Please sign in to comment.