Skip to content

Commit

Permalink
[pkg/stanza] move FlusherConfig and SplitterConfig to new file (#14762)
Browse files Browse the repository at this point in the history
move FlusherConfig and SplitterConfig from multiline file to new file.
  • Loading branch information
atingchen authored Oct 6, 2022
1 parent 0b850bf commit f247019
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 154 deletions.
111 changes: 111 additions & 0 deletions pkg/stanza/operator/helper/flusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"

import (
"bufio"
"time"
)

// FlusherConfig is a configuration of Flusher helper
type FlusherConfig struct {
Period time.Duration `mapstructure:"force_flush_period"`
}

// NewFlusherConfig creates a default Flusher config
func NewFlusherConfig() FlusherConfig {
return FlusherConfig{
// Empty or `0s` means that we will never force flush
Period: time.Millisecond * 500,
}
}

// Build creates Flusher from configuration
func (c *FlusherConfig) Build() *Flusher {
return &Flusher{
lastDataChange: time.Now(),
forcePeriod: c.Period,
previousDataLength: 0,
}
}

// Flusher keeps information about flush state
type Flusher struct {
// forcePeriod defines time from last flush which should pass before setting force to true.
// Never forces if forcePeriod is set to 0
forcePeriod time.Duration

// lastDataChange tracks date of last data change (including new data and flushes)
lastDataChange time.Time

// previousDataLength:
// if previousDataLength = 0 - no new data have been received after flush
// if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange
previousDataLength int
}

func (f *Flusher) UpdateDataChangeTime(length int) {
// Skip if length is greater than 0 and didn't changed
if length > 0 && length == f.previousDataLength {
return
}

// update internal properties with new values if data length changed
// because it means that data is flowing and being processed
f.previousDataLength = length
f.lastDataChange = time.Now()
}

// Flushed reset data length
func (f *Flusher) Flushed() {
f.UpdateDataChangeTime(0)
}

// ShouldFlush returns true if data should be forcefully flushed
func (f *Flusher) ShouldFlush() bool {
// Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0
return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0
}

func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)

// Return as it is in case of error
if err != nil {
return
}

// Return token
if token != nil {
// Inform flusher that we just flushed
f.Flushed()
return
}

// If there is no token, force flush eventually
if f.ShouldFlush() {
// Inform flusher that we just flushed
f.Flushed()
token = trimWhitespaces(data)
advance = len(data)
return
}

// Inform flusher that we didn't flushed
f.UpdateDataChangeTime(len(data))
return
}
}
154 changes: 0 additions & 154 deletions pkg/stanza/operator/helper/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,102 +19,10 @@ import (
"bytes"
"fmt"
"regexp"
"time"

"golang.org/x/text/encoding"
)

// FlusherConfig is a configuration of Flusher helper
type FlusherConfig struct {
Period time.Duration `mapstructure:"force_flush_period"`
}

// NewFlusherConfig creates a default Flusher config
func NewFlusherConfig() FlusherConfig {
return FlusherConfig{
// Empty or `0s` means that we will never force flush
Period: time.Millisecond * 500,
}
}

// Build creates Flusher from configuration
func (c *FlusherConfig) Build() *Flusher {
return &Flusher{
lastDataChange: time.Now(),
forcePeriod: c.Period,
previousDataLength: 0,
}
}

// Flusher keeps information about flush state
type Flusher struct {
// forcePeriod defines time from last flush which should pass before setting force to true.
// Never forces if forcePeriod is set to 0
forcePeriod time.Duration

// lastDataChange tracks date of last data change (including new data and flushes)
lastDataChange time.Time

// previousDataLength:
// if previousDataLength = 0 - no new data have been received after flush
// if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange
previousDataLength int
}

func (f *Flusher) UpdateDataChangeTime(length int) {
// Skip if length is greater than 0 and didn't changed
if length > 0 && length == f.previousDataLength {
return
}

// update internal properties with new values if data length changed
// because it means that data is flowing and being processed
f.previousDataLength = length
f.lastDataChange = time.Now()
}

// Flushed reset data length
func (f *Flusher) Flushed() {
f.UpdateDataChangeTime(0)
}

// ShouldFlush returns true if data should be forcefully flushed
func (f *Flusher) ShouldFlush() bool {
// Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0
return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0
}

func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)

// Return as it is in case of error
if err != nil {
return
}

// Return token
if token != nil {
// Inform flusher that we just flushed
f.Flushed()
return
}

// If there is no token, force flush eventually
if f.ShouldFlush() {
// Inform flusher that we just flushed
f.Flushed()
token = trimWhitespaces(data)
advance = len(data)
return
}

// Inform flusher that we didn't flushed
f.UpdateDataChangeTime(len(data))
return
}
}

// Multiline consists of splitFunc and variables needed to perform force flush
type Multiline struct {
SplitFunc bufio.SplitFunc
Expand Down Expand Up @@ -240,24 +148,6 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
}
}

// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop
func SplitNone(maxLogSize int) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if len(data) >= maxLogSize {
return maxLogSize, data[:maxLogSize], nil
}

if !atEOF {
return 0, nil, nil
}

if len(data) == 0 {
return 0, nil, nil
}
return len(data), data, nil
}
}

// NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that end with a match to the regex pattern provided
func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
Expand Down Expand Up @@ -345,47 +235,3 @@ func trimWhitespaces(data []byte) []byte {
}
return token
}

// SplitterConfig consolidates MultilineConfig and FlusherConfig
type SplitterConfig struct {
EncodingConfig EncodingConfig `mapstructure:",squash,omitempty"`
Multiline MultilineConfig `mapstructure:"multiline,omitempty"`
Flusher FlusherConfig `mapstructure:",squash,omitempty"`
}

// NewSplitterConfig returns default SplitterConfig
func NewSplitterConfig() SplitterConfig {
return SplitterConfig{
EncodingConfig: NewEncodingConfig(),
Multiline: NewMultilineConfig(),
Flusher: NewFlusherConfig(),
}
}

// Build builds Splitter struct
func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) {
enc, err := c.EncodingConfig.Build()
if err != nil {
return nil, err
}

flusher := c.Flusher.Build()
splitFunc, err := c.Multiline.Build(enc.Encoding, flushAtEOF, flusher, maxLogSize)

if err != nil {
return nil, err
}

return &Splitter{
Encoding: enc,
Flusher: flusher,
SplitFunc: splitFunc,
}, nil
}

// Splitter consolidates Flusher and dependent splitFunc
type Splitter struct {
Encoding Encoding
SplitFunc bufio.SplitFunc
Flusher *Flusher
}
79 changes: 79 additions & 0 deletions pkg/stanza/operator/helper/splitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"

import "bufio"

// SplitterConfig consolidates MultilineConfig and FlusherConfig
type SplitterConfig struct {
EncodingConfig EncodingConfig `mapstructure:",squash,omitempty"`
Flusher FlusherConfig `mapstructure:",squash,omitempty"`
Multiline MultilineConfig `mapstructure:"multiline,omitempty"`
}

// NewSplitterConfig returns default SplitterConfig
func NewSplitterConfig() SplitterConfig {
return SplitterConfig{
EncodingConfig: NewEncodingConfig(),
Multiline: NewMultilineConfig(),
Flusher: NewFlusherConfig(),
}
}

// Build builds Splitter struct
func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) {
enc, err := c.EncodingConfig.Build()
if err != nil {
return nil, err
}

flusher := c.Flusher.Build()
splitFunc, err := c.Multiline.Build(enc.Encoding, flushAtEOF, flusher, maxLogSize)

if err != nil {
return nil, err
}

return &Splitter{
Encoding: enc,
Flusher: flusher,
SplitFunc: splitFunc,
}, nil
}

// Splitter consolidates Flusher and dependent splitFunc
type Splitter struct {
Encoding Encoding
SplitFunc bufio.SplitFunc
Flusher *Flusher
}

// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop
func SplitNone(maxLogSize int) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if len(data) >= maxLogSize {
return maxLogSize, data[:maxLogSize], nil
}

if !atEOF {
return 0, nil, nil
}

if len(data) == 0 {
return 0, nil, nil
}
return len(data), data, nil
}
}

0 comments on commit f247019

Please sign in to comment.