Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File input: Quiet close error #345

Merged
merged 3 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ test-integration:

.PHONY: bench
bench:
$(MAKE) for-all CMD="go test -run=NONE -bench '.*' ./... -benchmem"
go test -benchmem -run=^$$ -bench ^* ./...

.PHONY: clean
clean:
Expand Down
180 changes: 180 additions & 0 deletions operator/builtin/input/file/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package file

import (
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/testutil"
)

type fileInputBenchmark struct {
name string
paths []string
config func() *InputConfig
}

type benchFile struct {
*os.File
log func(int)
}

func simpleTextFile(file *os.File) *benchFile {
line := stringWithLength(49) + "\n"
return &benchFile{
File: file,
log: func(_ int) { file.WriteString(line) },
}
}

func BenchmarkFileInput(b *testing.B) {
cases := []fileInputBenchmark{
{
name: "Single",
paths: []string{
"file0.log",
},
config: func() *InputConfig {
cfg := NewInputConfig("test_id")
cfg.Include = []string{
"file0.log",
}
return cfg
},
},
{
name: "Glob",
paths: []string{
"file0.log",
"file1.log",
"file2.log",
"file3.log",
},
config: func() *InputConfig {
cfg := NewInputConfig("test_id")
cfg.Include = []string{"file*.log"}
return cfg
},
},
{
name: "MultiGlob",
paths: []string{
"file0.log",
"file1.log",
"log0.log",
"log1.log",
},
config: func() *InputConfig {
cfg := NewInputConfig("test_id")
cfg.Include = []string{
"file*.log",
"log*.log",
}
return cfg
},
},
{
name: "MaxConcurrent",
paths: []string{
"file0.log",
"file1.log",
"file2.log",
"file3.log",
},
config: func() *InputConfig {
cfg := NewInputConfig("test_id")
cfg.Include = []string{
"file*.log",
}
cfg.MaxConcurrentFiles = 1
return cfg
},
},
{
name: "FngrPrntLarge",
paths: []string{
"file0.log",
},
config: func() *InputConfig {
cfg := NewInputConfig("test_id")
cfg.Include = []string{
"file*.log",
}
cfg.FingerprintSize = 10 * defaultFingerprintSize
return cfg
},
},
{
name: "FngrPrntSmall",
paths: []string{
"file0.log",
},
config: func() *InputConfig {
cfg := NewInputConfig("test_id")
cfg.Include = []string{
"file*.log",
}
cfg.FingerprintSize = defaultFingerprintSize / 10
return cfg
},
},
}

for _, bench := range cases {
b.Run(bench.name, func(b *testing.B) {
rootDir, err := ioutil.TempDir("", "")
require.NoError(b, err)

files := []*benchFile{}
for _, path := range bench.paths {
file := openFile(b, filepath.Join(rootDir, path))
files = append(files, simpleTextFile(file))
}

cfg := bench.config()
cfg.OutputIDs = []string{"fake"}
for i, inc := range cfg.Include {
cfg.Include[i] = filepath.Join(rootDir, inc)
}
cfg.StartAt = "beginning"

ops, err := cfg.Build(testutil.NewBuildContext(b))
require.NoError(b, err)
op := ops[0]

fakeOutput := testutil.NewFakeOutput(b)
err = op.SetOutputs([]operator.Operator{fakeOutput})
require.NoError(b, err)

// write half the lines before starting
mid := b.N / 2
for i := 0; i < mid; i++ {
for _, file := range files {
file.log(i)
}
}

b.ResetTimer()
err = op.Start()
defer op.Stop()
require.NoError(b, err)

// write the remainder of lines while running
go func() {
for i := mid; i < b.N; i++ {
for _, file := range files {
file.log(i)
}
}
}()

for i := 0; i < b.N*len(files); i++ {
<-fakeOutput.Received
}
})
}
}
1 change: 1 addition & 0 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func NewInputConfig(operatorID string) *InputConfig {
IncludeFileName: true,
IncludeFilePath: false,
StartAt: "end",
FingerprintSize: defaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: helper.NewEncodingConfig(),
Expand Down
154 changes: 153 additions & 1 deletion operator/builtin/input/file/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
"github.com/observiq/stanza/operator/helper/operatortest"
"github.com/observiq/stanza/testutil"
"github.com/stretchr/testify/require"
)

func TestConfig(t *testing.T) {
func TestUnmarshal(t *testing.T) {
cases := []operatortest.ConfigUnmarshalTest{
{
Name: "default",
Expand Down Expand Up @@ -509,6 +512,155 @@ func TestConfig(t *testing.T) {
}
}

func TestBuild(t *testing.T) {
t.Parallel()
fakeOutput := testutil.NewMockOperator("$.fake")

basicConfig := func() *InputConfig {
cfg := NewInputConfig("testfile")
cfg.OutputIDs = []string{"fake"}
cfg.Include = []string{"/var/log/testpath.*"}
cfg.Exclude = []string{"/var/log/testpath.ex*"}
cfg.PollInterval = helper.Duration{Duration: 10 * time.Millisecond}
return cfg
}

cases := []struct {
name string
modifyBaseConfig func(*InputConfig)
errorRequirement require.ErrorAssertionFunc
validate func(*testing.T, *InputOperator)
}{
{
"Basic",
func(f *InputConfig) { return },
require.NoError,
func(t *testing.T, f *InputOperator) {
require.Equal(t, f.OutputOperators[0], fakeOutput)
require.Equal(t, f.Include, []string{"/var/log/testpath.*"})
require.Equal(t, f.FilePathField, entry.NewNilField())
require.Equal(t, f.FileNameField, entry.NewLabelField("file_name"))
require.Equal(t, f.PollInterval, 10*time.Millisecond)
},
},
{
"BadIncludeGlob",
func(f *InputConfig) {
f.Include = []string{"["}
},
require.Error,
nil,
},
{
"BadExcludeGlob",
func(f *InputConfig) {
f.Include = []string{"["}
},
require.Error,
nil,
},
{
"MultilineConfiguredStartAndEndPatterns",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
LineEndPattern: "Exists",
LineStartPattern: "Exists",
}
},
require.Error,
nil,
},
{
"MultilineConfiguredStartPattern",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
LineStartPattern: "START.*",
}
},
require.NoError,
func(t *testing.T, f *InputOperator) {},
},
{
"MultilineConfiguredEndPattern",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
LineEndPattern: "END.*",
}
},
require.NoError,
func(t *testing.T, f *InputOperator) {},
},
{
"InvalidEncoding",
func(f *InputConfig) {
f.Encoding = helper.EncodingConfig{Encoding: "UTF-3233"}
},
require.Error,
nil,
},
{
"LineStartAndEnd",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
LineStartPattern: ".*",
LineEndPattern: ".*",
}
},
require.Error,
nil,
},
{
"NoLineStartOrEnd",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{}
},
require.NoError,
func(t *testing.T, f *InputOperator) {},
},
{
"InvalidLineStartRegex",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
LineStartPattern: "(",
}
},
require.Error,
nil,
},
{
"InvalidLineEndRegex",
func(f *InputConfig) {
f.Multiline = helper.MultilineConfig{
LineEndPattern: "(",
}
},
require.Error,
nil,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tc := tc
t.Parallel()
cfg := basicConfig()
tc.modifyBaseConfig(cfg)

ops, err := cfg.Build(testutil.NewBuildContext(t))
tc.errorRequirement(t, err)
if err != nil {
return
}
op := ops[0]

err = op.SetOutputs([]operator.Operator{fakeOutput})
require.NoError(t, err)

fileInput := op.(*InputOperator)
tc.validate(t, fileInput)
})
}
}
func defaultCfg() *InputConfig {
return NewInputConfig("file_input")
}
Expand Down
5 changes: 0 additions & 5 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,6 @@ func (f *InputOperator) poll(ctx context.Context) {
// Wait until all the reader goroutines are finished
wg.Wait()

// Close all files
for _, reader := range readers {
reader.Close()
}

f.saveCurrent(readers)
f.syncLastPollFiles()
}
Expand Down
Loading