From aee67dde5e2150db5dac54135b06023a1440d147 Mon Sep 17 00:00:00 2001 From: Mrod1598 Date: Tue, 1 Jun 2021 14:19:32 -0400 Subject: [PATCH 1/4] Fix plugin inputs (#156) * Add testing for IDs * Test Ops is pipeline * Add testing template * Change how default outputs are set * Add testing for outputs * Remove old default outputs func * Add ID testing * Add testing for output handling * Update Tests * Update mock comment * Implement PR feedback * Update comment * Implement PR feedback * Update Comments * Force CI * Implement PR feedback * Add new test case --- operator/builtin/transformer/router/router.go | 15 + operator/config.go | 1 + operator/config_test.go | 1 + operator/helper/output.go | 12 + operator/helper/writer.go | 18 +- operator/operator.go | 4 + pipeline/config.go | 53 ++- plugin/config.go | 2 + plugin/config_test.go | 329 ++++++++++++++++++ testutil/mocks.go | 6 + testutil/operator.go | 21 ++ testutil/operator_builder.go | 14 + 12 files changed, 462 insertions(+), 14 deletions(-) diff --git a/operator/builtin/transformer/router/router.go b/operator/builtin/transformer/router/router.go index f5260f5a..989a760a 100644 --- a/operator/builtin/transformer/router/router.go +++ b/operator/builtin/transformer/router/router.go @@ -95,6 +95,9 @@ func (c RouterOperatorConfig) Build(bc operator.BuildContext) ([]operator.Operat return []operator.Operator{routerOperator}, nil } +// BuildsMultipleOps returns false +func (c RouterOperatorConfig) BuildsMultipleOps() bool { return false } + // RouterOperator is an operator that routes entries based on matching expressions type RouterOperator struct { helper.BasicOperator @@ -157,6 +160,15 @@ func (p *RouterOperator) Outputs() []operator.Operator { return outputs } +// GetOutputIDs will return all connected operators. +func (p *RouterOperator) GetOutputIDs() []string { + outputs := make([]string, 0, len(p.routes)) + for _, route := range p.routes { + outputs = append(outputs, route.OutputIDs...) + } + return outputs +} + // SetOutputs will set the outputs of the router operator. func (p *RouterOperator) SetOutputs(operators []operator.Operator) error { for _, route := range p.routes { @@ -170,6 +182,9 @@ func (p *RouterOperator) SetOutputs(operators []operator.Operator) error { return nil } +// SetOutputIDs will do nothing. +func (p *RouterOperator) SetOutputIDs(opIDs []string) {} + // findOperators will find a subset of operators from a collection. func (p *RouterOperator) findOperators(operators []operator.Operator, operatorIDs []string) ([]operator.Operator, error) { result := make([]operator.Operator, 0) diff --git a/operator/config.go b/operator/config.go index c48816b3..8ffb70cd 100644 --- a/operator/config.go +++ b/operator/config.go @@ -31,6 +31,7 @@ type Builder interface { ID() string Type() string Build(BuildContext) ([]Operator, error) + BuildsMultipleOps() bool } // UnmarshalJSON will unmarshal a config from JSON. diff --git a/operator/config_test.go b/operator/config_test.go index 0c2260cd..3fc0dd0f 100644 --- a/operator/config_test.go +++ b/operator/config_test.go @@ -31,6 +31,7 @@ type FakeBuilder struct { func (f *FakeBuilder) Build(context BuildContext) ([]Operator, error) { return nil, nil } func (f *FakeBuilder) ID() string { return "plugin" } func (f *FakeBuilder) Type() string { return "plugin" } +func (f *FakeBuilder) BuildsMultipleOps() bool { return false } func TestUnmarshalJSONErrors(t *testing.T) { t.Cleanup(func() { diff --git a/operator/helper/output.go b/operator/helper/output.go index 05976245..dafc20a5 100644 --- a/operator/helper/output.go +++ b/operator/helper/output.go @@ -45,6 +45,9 @@ func (c OutputConfig) Build(context operator.BuildContext) (OutputOperator, erro return outputOperator, nil } +// BuildsMultipleOps Returns false +func (c OutputConfig) BuildsMultipleOps() bool { return false } + // OutputOperator provides a basic implementation of an output operator. type OutputOperator struct { BasicOperator @@ -65,6 +68,11 @@ func (o *OutputOperator) Outputs() []operator.Operator { return []operator.Operator{} } +// GetOutputIDs will always return an empty array for an output ID. +func (o *OutputOperator) GetOutputIDs() []string { + return []string{} +} + // SetOutputs will return an error if called. func (o *OutputOperator) SetOutputs(operators []operator.Operator) error { return errors.NewError( @@ -72,3 +80,7 @@ func (o *OutputOperator) SetOutputs(operators []operator.Operator) error { "This is an unexpected internal error. Please submit a bug/issue.", ) } + +// SetOutputIDs will return nothing and does nothing. +func (o *OutputOperator) SetOutputIDs(opIDs []string) { +} diff --git a/operator/helper/writer.go b/operator/helper/writer.go index c2f42baa..4157ce92 100644 --- a/operator/helper/writer.go +++ b/operator/helper/writer.go @@ -45,9 +45,6 @@ func (c WriterConfig) Build(bc operator.BuildContext) (WriterOperator, error) { // Namespace all the output IDs namespacedIDs := c.OutputIDs.WithNamespace(bc) - if len(namespacedIDs) == 0 { - namespacedIDs = bc.DefaultOutputIDs - } writer := WriterOperator{ OutputIDs: namespacedIDs, @@ -56,6 +53,11 @@ func (c WriterConfig) Build(bc operator.BuildContext) (WriterOperator, error) { return writer, nil } +// BuildsMultipleOps Returns false as a base line +func (c WriterConfig) BuildsMultipleOps() bool { + return false +} + // WriterOperator is an operator that can write to other operators. type WriterOperator struct { BasicOperator @@ -84,6 +86,11 @@ func (w *WriterOperator) Outputs() []operator.Operator { return w.OutputOperators } +// GetOutputIDs returns the output IDs of the writer operator. +func (w *WriterOperator) GetOutputIDs() []string { + return w.OutputIDs +} + // SetOutputs will set the outputs of the operator. func (w *WriterOperator) SetOutputs(operators []operator.Operator) error { outputOperators := make([]operator.Operator, 0) @@ -105,6 +112,11 @@ func (w *WriterOperator) SetOutputs(operators []operator.Operator) error { return nil } +// SetOutputIDs will set the outputs of the operator. +func (w *WriterOperator) SetOutputIDs(opIds []string) { + w.OutputIDs = opIds +} + // FindOperator will find an operator matching the supplied id. func (w *WriterOperator) findOperator(operators []operator.Operator, operatorID string) (operator.Operator, bool) { for _, operator := range operators { diff --git a/operator/operator.go b/operator/operator.go index d4e28469..ba7fecfc 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -40,8 +40,12 @@ type Operator interface { CanOutput() bool // Outputs returns the list of connected outputs. Outputs() []Operator + // GetOutputIDs returns the list of connected outputs. + GetOutputIDs() []string // SetOutputs will set the connected outputs. SetOutputs([]Operator) error + // SetOutputIDs will set the connected outputs' IDs. + SetOutputIDs([]string) // CanProcess indicates if the operator will process entries from other operators. CanProcess() bool diff --git a/pipeline/config.go b/pipeline/config.go index cf43ef02..856897d3 100644 --- a/pipeline/config.go +++ b/pipeline/config.go @@ -21,17 +21,25 @@ import ( // Config is the configuration of a pipeline. type Config []operator.Config -// BuildOperators builds the operators from the list of configs into operators +// BuildOperators builds the operators from the list of configs into operators. func (c Config) BuildOperators(bc operator.BuildContext) ([]operator.Operator, error) { + // buildsMulti's key represents an operator's ID that builds multiple operators, e.g. Plugins. + // the value is the plugin's first operator's ID. + buildsMulti := make(map[string]string) operators := make([]operator.Operator, 0, len(c)) - for i, builder := range c { - nbc := getBuildContextWithDefaultOutput(c, i, bc) - op, err := builder.Build(nbc) + for _, builder := range c { + op, err := builder.Build(bc) if err != nil { return nil, err } + + if builder.BuildsMultipleOps() { + buildsMulti[bc.PrependNamespace(builder.ID())] = op[0].ID() + } operators = append(operators, op...) } + SetOutputIDs(operators, buildsMulti) + return operators, nil } @@ -53,12 +61,35 @@ func (c Config) BuildPipeline(bc operator.BuildContext, defaultOperator operator return NewDirectedPipeline(operators) } -func getBuildContextWithDefaultOutput(configs []operator.Config, i int, bc operator.BuildContext) operator.BuildContext { - if i+1 >= len(configs) { - return bc - } +// SetOutputIDs Loops through all the operators and sets a default output to the next operator in the slice. +// Additionally, if the output is set to a plugin, it sets the output to the first operator in the plugins pipeline. +func SetOutputIDs(operators []operator.Operator, buildsMulti map[string]string) error { + for i, op := range operators { + // because no output is specified at this point for the last operator, + // it will always be empty and there is nothing after it to automatically point towards, so we break the loop + if i+1 == len(operators) { + break + } - id := configs[i+1].ID() - id = bc.PrependNamespace(id) - return bc.WithDefaultOutputIDs([]string{id}) + if len(op.GetOutputIDs()) == 0 { + op.SetOutputIDs([]string{operators[i+1].ID()}) + continue + } + + // Check if there are any plugins within the outputIDs of the operator. If there are, change the output to be the first op in the plugin + allOutputs := []string{} + pluginFound := false + for _, id := range op.GetOutputIDs() { + if pid, ok := buildsMulti[id]; ok { + id = pid + pluginFound = true + } + allOutputs = append(allOutputs, id) + } + + if pluginFound { + op.SetOutputIDs(allOutputs) + } + } + return nil } diff --git a/plugin/config.go b/plugin/config.go index bd6728c9..d3c4751a 100644 --- a/plugin/config.go +++ b/plugin/config.go @@ -60,6 +60,8 @@ func (c *Config) Build(bc operator.BuildContext) ([]operator.Operator, error) { return pipelineConfig.Pipeline.BuildOperators(nbc) } +func (c *Config) BuildsMultipleOps() bool { return true } + func (c *Config) getRenderParams(bc operator.BuildContext) map[string]interface{} { // Copy the parameters to avoid mutating them params := map[string]interface{}{} diff --git a/plugin/config_test.go b/plugin/config_test.go index 58ed601c..9a10e010 100644 --- a/plugin/config_test.go +++ b/plugin/config_test.go @@ -116,6 +116,7 @@ pipeline: type: noop - id: noop1 type: noop + output: {{ .output }} `) pluginName := "my_plugin" pluginVar, err := NewPlugin(pluginName, pluginContent) @@ -238,6 +239,334 @@ pipeline: } } +type PluginOutputIDTestCase struct { + Name string + PluginConfig pipeline.Config + ExpectedOpIDs map[string][]string +} + +func TestPluginOutputIDs(t *testing.T) { + // TODO: ids shouldn't need to be specified once autogen IDs are implemented + pluginContent := []byte(` +parameters: +pipeline: + - type: noop + - id: noop1 + type: noop + output: {{ .output }} +`) + pluginName := "my_plugin" + pluginVar, err := NewPlugin(pluginName, pluginContent) + require.NoError(t, err) + operator.RegisterPlugin(pluginVar.ID, pluginVar.NewBuilder) + + // TODO: remove ID assignment + pluginContent2 := []byte(` +parameters: +pipeline: + - type: noop + - id: noop1 + type: noop + output: {{ .output }} +`) + secondPlugin := "secondPlugin" + secondPluginVar, err := NewPlugin(secondPlugin, pluginContent2) + require.NoError(t, err) + operator.RegisterPlugin(secondPluginVar.ID, secondPluginVar.NewBuilder) + + pluginContent3 := []byte(` +parameters: +pipeline: + - type: my_plugin + - type: noop + output: {{ .output }} +`) + layeredPlugin := "layeredPlugin" + layeredPluginVar, err := NewPlugin(layeredPlugin, pluginContent3) + require.NoError(t, err) + operator.RegisterPlugin(layeredPluginVar.ID, layeredPluginVar.NewBuilder) + + cases := []PluginOutputIDTestCase{ + { + Name: "same_op_outside_plugin", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: noop.NewNoopOperatorConfig("noop"), + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + // TODO: ID should be noop to start then auto gened to noop1 + Builder: noop.NewNoopOperatorConfig("noop1"), + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$.noop": {"$." + pluginName + ".noop"}, + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$.noop1"}, + }, + }, + { + Name: "two_plugins_with_same_ops", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: secondPlugin, + OperatorType: secondPlugin, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: secondPluginVar, + }, + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$." + secondPlugin + ".noop"}, + "$." + secondPlugin + ".noop": {"$." + secondPlugin + ".noop1"}, + }, + }, + { + Name: "two_plugins_specified_output", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + OutputIDs: []string{"noop"}, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: secondPlugin, + OperatorType: secondPlugin, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: secondPluginVar, + }, + }, + operator.Config{ + Builder: noop.NewNoopOperatorConfig("noop"), + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$.noop"}, + "$." + secondPlugin + ".noop": {"$." + secondPlugin + ".noop1"}, + "$." + secondPlugin + ".noop1": {"$.noop"}, + }, + }, + { + Name: "two_plugins_output_to_non_sequential_plugin", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + OutputIDs: []string{secondPlugin}, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + Builder: noop.NewNoopOperatorConfig("noop"), + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: secondPlugin, + OperatorType: secondPlugin, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: secondPluginVar, + }, + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$." + secondPlugin + ".noop"}, + "$.noop": {"$." + secondPlugin + ".noop"}, + "$." + secondPlugin + ".noop": {"$." + secondPlugin + ".noop1"}, + }, + }, + { + Name: "two_plugins_with_multiple_outside_ops", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: noop.NewNoopOperatorConfig("noop"), + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + // TODO: ID should be noop to start then auto gened to noop1 + Builder: noop.NewNoopOperatorConfig("noop1"), + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: secondPlugin, + OperatorType: secondPlugin, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: secondPluginVar, + }, + }, + operator.Config{ + // TODO: ID should be noop to start then auto gened to noop2 + Builder: noop.NewNoopOperatorConfig("noop2"), + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$.noop": {"$." + pluginName + ".noop"}, + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$.noop1"}, + "$.noop1": {"$." + secondPlugin + ".noop"}, + "$." + secondPlugin + ".noop": {"$." + secondPlugin + ".noop1"}, + "$." + secondPlugin + ".noop1": {"$.noop2"}, + }, + }, + { + Name: "two_plugins_of_same_type", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName + "1", + OperatorType: pluginName, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: secondPluginVar, + }, + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$." + pluginName + "1.noop"}, + "$." + pluginName + "1.noop": {"$." + pluginName + "1.noop1"}, + }, + }, + { + Name: "plugin_within_a_plugin", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: layeredPlugin, + OperatorType: layeredPlugin, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: layeredPluginVar, + }, + }, + operator.Config{ + Builder: noop.NewNoopOperatorConfig("noop"), + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$.layeredPlugin." + pluginName + ".noop": {"$.layeredPlugin." + pluginName + ".noop1"}, + "$.layeredPlugin." + pluginName + ".noop1": {"$.layeredPlugin.noop"}, + "$.layeredPlugin.noop": {"$.noop"}, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + ops, err := tc.PluginConfig.BuildOperators(testutil.NewBuildContext(t)) + require.NoError(t, err) + + for i, op := range ops { + if i+1 < len(ops) { + out := op.GetOutputIDs() + t.Log("ID:" + op.ID()) + require.Equal(t, tc.ExpectedOpIDs[op.ID()], out) + } + } + }) + } +} + func TestBuildRecursiveFails(t *testing.T) { pluginConfig1 := []byte(` pipeline: diff --git a/testutil/mocks.go b/testutil/mocks.go index 65af6835..8698167b 100644 --- a/testutil/mocks.go +++ b/testutil/mocks.go @@ -65,9 +65,15 @@ func (f *FakeOutput) Logger() *zap.SugaredLogger { return f.SugaredLogger } // Outputs always returns nil for a fake output func (f *FakeOutput) Outputs() []operator.Operator { return nil } +// Outputs always returns nil for a fake output +func (f *FakeOutput) GetOutputIDs() []string { return nil } + // SetOutputs immediately returns nil for a fake output func (f *FakeOutput) SetOutputs(outputs []operator.Operator) error { return nil } +// SetOutputIDs immediately returns nil for a fake output +func (f *FakeOutput) SetOutputIDs(s []string) {} + // Start immediately returns nil for a fake output func (f *FakeOutput) Start(_ operator.Persister) error { return nil } diff --git a/testutil/operator.go b/testutil/operator.go index 20ad941b..a255be13 100644 --- a/testutil/operator.go +++ b/testutil/operator.go @@ -46,6 +46,22 @@ func (_m *Operator) CanProcess() bool { return r0 } +// GetOutputIDs provides a mock function with given fields: +func (_m *Operator) GetOutputIDs() []string { + ret := _m.Called() + + var r0 []string + if rf, ok := ret.Get(0).(func() []string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + return r0 +} + // ID provides a mock function with given fields: func (_m *Operator) ID() string { ret := _m.Called() @@ -106,6 +122,11 @@ func (_m *Operator) Process(_a0 context.Context, _a1 *entry.Entry) error { return r0 } +// SetOutputIDs provides a mock function with given fields: _a0 +func (_m *Operator) SetOutputIDs(_a0 []string) { + _m.Called(_a0) +} + // SetOutputs provides a mock function with given fields: _a0 func (_m *Operator) SetOutputs(_a0 []operator.Operator) error { ret := _m.Called(_a0) diff --git a/testutil/operator_builder.go b/testutil/operator_builder.go index 653bd854..b14fe035 100644 --- a/testutil/operator_builder.go +++ b/testutil/operator_builder.go @@ -35,6 +35,20 @@ func (_m *OperatorBuilder) Build(_a0 operator.BuildContext) ([]operator.Operator return r0, r1 } +// BuildsMultipleOps provides a mock function with given fields: +func (_m *OperatorBuilder) BuildsMultipleOps() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + // ID provides a mock function with given fields: func (_m *OperatorBuilder) ID() string { ret := _m.Called() From fa688924762b489a5d233b609d73ddd2282d9a75 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Tue, 1 Jun 2021 14:20:40 -0400 Subject: [PATCH 2/4] Cleanup file input 2 (#165) * Split file_test.go into multiple files * Rename utils_test.go to util_test.go * Move TestBuild into config_test.go, and all rotation tests into rotation_test.go --- operator/builtin/input/file/benchmark_test.go | 84 ++ operator/builtin/input/file/config_test.go | 154 +++- operator/builtin/input/file/file_test.go | 788 ------------------ operator/builtin/input/file/rotation_test.go | 450 ++++++++++ operator/builtin/input/file/util_test.go | 190 +++++ 5 files changed, 877 insertions(+), 789 deletions(-) create mode 100644 operator/builtin/input/file/benchmark_test.go create mode 100644 operator/builtin/input/file/rotation_test.go create mode 100644 operator/builtin/input/file/util_test.go diff --git a/operator/builtin/input/file/benchmark_test.go b/operator/builtin/input/file/benchmark_test.go new file mode 100644 index 00000000..f579d17b --- /dev/null +++ b/operator/builtin/input/file/benchmark_test.go @@ -0,0 +1,84 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-log-collection/operator" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" +) + +type fileInputBenchmark struct { + name string + config *InputConfig +} + +func BenchmarkFileInput(b *testing.B) { + cases := []fileInputBenchmark{ + { + "Default", + NewInputConfig("test_id"), + }, + { + "NoFileName", + func() *InputConfig { + cfg := NewInputConfig("test_id") + cfg.IncludeFileName = false + return cfg + }(), + }, + } + + for _, tc := range cases { + b.Run(tc.name, func(b *testing.B) { + tempDir := testutil.NewTempDir(b) + path := filepath.Join(tempDir, "in.log") + + cfg := tc.config + cfg.OutputIDs = []string{"fake"} + cfg.Include = []string{path} + cfg.StartAt = "beginning" + + ops, err := cfg.Build(testutil.NewBuildContext(b)) + require.NoError(b, err) + op := ops[0] + + fakeOutput := testutil.NewFakeOutput(b) + err = op.SetOutputs([]operator.Operator{fakeOutput}) + require.NoError(b, err) + + err = op.Start(testutil.NewMockPersister("test")) + defer op.Stop() + require.NoError(b, err) + + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666) + require.NoError(b, err) + + for i := 0; i < b.N; i++ { + file.WriteString("testlog\n") + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + <-fakeOutput.Received + } + }) + } +} diff --git a/operator/builtin/input/file/config_test.go b/operator/builtin/input/file/config_test.go index 172d6e49..a9da65f2 100644 --- a/operator/builtin/input/file/config_test.go +++ b/operator/builtin/input/file/config_test.go @@ -22,11 +22,13 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/operator" "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" "github.com/open-telemetry/opentelemetry-log-collection/operator/helper/operatortest" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" ) -func TestConfig(t *testing.T) { +func TestUnmarshal(t *testing.T) { cases := []operatortest.ConfigUnmarshalTest{ { Name: "default", @@ -512,6 +514,156 @@ func TestConfig(t *testing.T) { } } +func TestBuild(t *testing.T) { + t.Parallel() + fakeOutput := testutil.NewMockOperator("$.fake") + + basicConfig := func() *InputConfig { + cfg := NewInputConfig("testfile") + cfg.OutputIDs = []string{"fake"} + cfg.Include = []string{"/var/log/testpath.*"} + cfg.Exclude = []string{"/var/log/testpath.ex*"} + cfg.PollInterval = helper.Duration{Duration: 10 * time.Millisecond} + return cfg + } + + cases := []struct { + name string + modifyBaseConfig func(*InputConfig) + errorRequirement require.ErrorAssertionFunc + validate func(*testing.T, *InputOperator) + }{ + { + "Basic", + func(f *InputConfig) {}, + require.NoError, + func(t *testing.T, f *InputOperator) { + require.Equal(t, f.OutputOperators[0], fakeOutput) + require.Equal(t, f.Include, []string{"/var/log/testpath.*"}) + require.Equal(t, f.FilePathField, entry.NewNilField()) + require.Equal(t, f.FileNameField, entry.NewAttributeField("file_name")) + require.Equal(t, f.PollInterval, 10*time.Millisecond) + }, + }, + { + "BadIncludeGlob", + func(f *InputConfig) { + f.Include = []string{"["} + }, + require.Error, + nil, + }, + { + "BadExcludeGlob", + func(f *InputConfig) { + f.Include = []string{"["} + }, + require.Error, + nil, + }, + { + "MultilineConfiguredStartAndEndPatterns", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineEndPattern: "Exists", + LineStartPattern: "Exists", + } + }, + require.Error, + nil, + }, + { + "MultilineConfiguredStartPattern", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineStartPattern: "START.*", + } + }, + require.NoError, + func(t *testing.T, f *InputOperator) {}, + }, + { + "MultilineConfiguredEndPattern", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineEndPattern: "END.*", + } + }, + require.NoError, + func(t *testing.T, f *InputOperator) {}, + }, + { + "InvalidEncoding", + func(f *InputConfig) { + f.Encoding = helper.EncodingConfig{Encoding: "UTF-3233"} + }, + require.Error, + nil, + }, + { + "LineStartAndEnd", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineStartPattern: ".*", + LineEndPattern: ".*", + } + }, + require.Error, + nil, + }, + { + "NoLineStartOrEnd", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{} + }, + require.NoError, + func(t *testing.T, f *InputOperator) {}, + }, + { + "InvalidLineStartRegex", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineStartPattern: "(", + } + }, + require.Error, + nil, + }, + { + "InvalidLineEndRegex", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineEndPattern: "(", + } + }, + require.Error, + nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tc := tc + t.Parallel() + cfg := basicConfig() + tc.modifyBaseConfig(cfg) + + ops, err := cfg.Build(testutil.NewBuildContext(t)) + tc.errorRequirement(t, err) + if err != nil { + return + } + op := ops[0] + + err = op.SetOutputs([]operator.Operator{fakeOutput}) + require.NoError(t, err) + + fileInput := op.(*InputOperator) + tc.validate(t, fileInput) + }) + } +} + func defaultCfg() *InputConfig { return NewInputConfig("file_input") } diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 46583f2b..663876b8 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -17,253 +17,20 @@ package file import ( "context" "fmt" - "io" - "io/ioutil" - "log" - "math/rand" "os" "path/filepath" - "runtime" "strconv" "sync" "testing" "time" - "github.com/observiq/nanojack" "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-log-collection/entry" - "github.com/open-telemetry/opentelemetry-log-collection/operator" "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" "github.com/open-telemetry/opentelemetry-log-collection/testutil" ) -func newDefaultConfig(tempDir string) *InputConfig { - cfg := NewInputConfig("testfile") - cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} - cfg.StartAt = "beginning" - cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)} - cfg.OutputIDs = []string{"fake"} - return cfg -} - -func newTestFileOperator(t *testing.T, cfgMod func(*InputConfig), outMod func(*testutil.FakeOutput)) (*InputOperator, chan *entry.Entry, string) { - fakeOutput := testutil.NewFakeOutput(t) - if outMod != nil { - outMod(fakeOutput) - } - - tempDir := testutil.NewTempDir(t) - - cfg := newDefaultConfig(tempDir) - if cfgMod != nil { - cfgMod(cfg) - } - ops, err := cfg.Build(testutil.NewBuildContext(t)) - require.NoError(t, err) - op := ops[0] - - err = op.SetOutputs([]operator.Operator{fakeOutput}) - require.NoError(t, err) - - return op.(*InputOperator), fakeOutput.Received, tempDir -} - -func openFile(t testing.TB, path string) *os.File { - file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0777) - require.NoError(t, err) - t.Cleanup(func() { _ = file.Close() }) - return file -} - -func openTemp(t testing.TB, tempDir string) *os.File { - return openTempWithPattern(t, tempDir, "") -} - -func reopenTemp(t testing.TB, name string) *os.File { - return openTempWithPattern(t, filepath.Dir(name), filepath.Base(name)) -} - -func openTempWithPattern(t testing.TB, tempDir, pattern string) *os.File { - file, err := ioutil.TempFile(tempDir, pattern) - require.NoError(t, err) - t.Cleanup(func() { _ = file.Close() }) - return file -} - -func getRotatingLogger(t testing.TB, tempDir string, maxLines, maxBackups int, copyTruncate, sequential bool) *log.Logger { - file, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - require.NoError(t, file.Close()) // will be managed by rotator - - rotator := nanojack.Logger{ - Filename: file.Name(), - MaxLines: maxLines, - MaxBackups: maxBackups, - CopyTruncate: copyTruncate, - Sequential: sequential, - } - - t.Cleanup(func() { _ = rotator.Close() }) - - return log.New(&rotator, "", 0) -} - -func writeString(t testing.TB, file *os.File, s string) { - _, err := file.WriteString(s) - require.NoError(t, err) -} - -func TestBuild(t *testing.T) { - t.Parallel() - fakeOutput := testutil.NewMockOperator("$.fake") - - basicConfig := func() *InputConfig { - cfg := NewInputConfig("testfile") - cfg.OutputIDs = []string{"fake"} - cfg.Include = []string{"/var/log/testpath.*"} - cfg.Exclude = []string{"/var/log/testpath.ex*"} - cfg.PollInterval = helper.Duration{Duration: 10 * time.Millisecond} - return cfg - } - - cases := []struct { - name string - modifyBaseConfig func(*InputConfig) - errorRequirement require.ErrorAssertionFunc - validate func(*testing.T, *InputOperator) - }{ - { - "Basic", - func(f *InputConfig) {}, - require.NoError, - func(t *testing.T, f *InputOperator) { - require.Equal(t, f.OutputOperators[0], fakeOutput) - require.Equal(t, f.Include, []string{"/var/log/testpath.*"}) - require.Equal(t, f.FilePathField, entry.NewNilField()) - require.Equal(t, f.FileNameField, entry.NewAttributeField("file_name")) - require.Equal(t, f.PollInterval, 10*time.Millisecond) - }, - }, - { - "BadIncludeGlob", - func(f *InputConfig) { - f.Include = []string{"["} - }, - require.Error, - nil, - }, - { - "BadExcludeGlob", - func(f *InputConfig) { - f.Include = []string{"["} - }, - require.Error, - nil, - }, - { - "MultilineConfiguredStartAndEndPatterns", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineEndPattern: "Exists", - LineStartPattern: "Exists", - } - }, - require.Error, - nil, - }, - { - "MultilineConfiguredStartPattern", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineStartPattern: "START.*", - } - }, - require.NoError, - func(t *testing.T, f *InputOperator) {}, - }, - { - "MultilineConfiguredEndPattern", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineEndPattern: "END.*", - } - }, - require.NoError, - func(t *testing.T, f *InputOperator) {}, - }, - { - "InvalidEncoding", - func(f *InputConfig) { - f.Encoding = helper.EncodingConfig{Encoding: "UTF-3233"} - }, - require.Error, - nil, - }, - { - "LineStartAndEnd", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineStartPattern: ".*", - LineEndPattern: ".*", - } - }, - require.Error, - nil, - }, - { - "NoLineStartOrEnd", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{} - }, - require.NoError, - func(t *testing.T, f *InputOperator) {}, - }, - { - "InvalidLineStartRegex", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineStartPattern: "(", - } - }, - require.Error, - nil, - }, - { - "InvalidLineEndRegex", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineEndPattern: "(", - } - }, - require.Error, - nil, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - tc := tc - t.Parallel() - cfg := basicConfig() - tc.modifyBaseConfig(cfg) - - ops, err := cfg.Build(testutil.NewBuildContext(t)) - tc.errorRequirement(t, err) - if err != nil { - return - } - op := ops[0] - - err = op.SetOutputs([]operator.Operator{fakeOutput}) - require.NoError(t, err) - - fileInput := op.(*InputOperator) - tc.validate(t, fileInput) - }) - } -} - func TestCleanStop(t *testing.T) { t.Parallel() t.Skip(`Skipping due to goroutine leak in opencensus. @@ -556,391 +323,6 @@ func TestMultiFileParallel_LiveFiles(t *testing.T) { wg.Wait() } -func TestMultiFileRotate(t *testing.T) { - t.Parallel() - - getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } - - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - - numFiles := 3 - numMessages := 3 - numRotations := 3 - - expected := make([]string, 0, numFiles*numMessages*numRotations) - for i := 0; i < numFiles; i++ { - for j := 0; j < numMessages; j++ { - for k := 0; k < numRotations; k++ { - expected = append(expected, getMessage(i, k, j)) - } - } - } - - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) - defer operator.Stop() - - temps := make([]*os.File, 0, numFiles) - for i := 0; i < numFiles; i++ { - temps = append(temps, openTemp(t, tempDir)) - } - - var wg sync.WaitGroup - for i, temp := range temps { - wg.Add(1) - go func(tf *os.File, f int) { - defer wg.Done() - for k := 0; k < numRotations; k++ { - for j := 0; j < numMessages; j++ { - writeString(t, tf, getMessage(f, k, j)+"\n") - } - - require.NoError(t, tf.Close()) - require.NoError(t, os.Rename(tf.Name(), fmt.Sprintf("%s.%d", tf.Name(), k))) - tf = reopenTemp(t, tf.Name()) - } - }(temp, i) - } - - waitForMessages(t, logReceived, expected) - wg.Wait() -} - -func TestMultiFileRotateSlow(t *testing.T) { - if runtime.GOOS == "windows" { - // Windows has very poor support for moving active files, so rotation is less commonly used - // This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358 - t.Skip() - } - - t.Parallel() - - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - - getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } - fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) } - baseFileName := func(f int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.log", f)) } - - numFiles := 3 - numMessages := 30 - numRotations := 3 - - expected := make([]string, 0, numFiles*numMessages*numRotations) - for i := 0; i < numFiles; i++ { - for j := 0; j < numMessages; j++ { - for k := 0; k < numRotations; k++ { - expected = append(expected, getMessage(i, k, j)) - } - } - } - - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) - defer operator.Stop() - - var wg sync.WaitGroup - for fileNum := 0; fileNum < numFiles; fileNum++ { - wg.Add(1) - go func(fn int) { - defer wg.Done() - - for rotationNum := 0; rotationNum < numRotations; rotationNum++ { - file := openFile(t, baseFileName(fn)) - for messageNum := 0; messageNum < numMessages; messageNum++ { - writeString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") - time.Sleep(5 * time.Millisecond) - } - - require.NoError(t, file.Close()) - require.NoError(t, os.Rename(baseFileName(fn), fileName(fn, rotationNum))) - } - }(fileNum) - } - - waitForMessages(t, logReceived, expected) - wg.Wait() -} - -func TestMultiCopyTruncateSlow(t *testing.T) { - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - - getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } - fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) } - baseFileName := func(f int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.log", f)) } - - numFiles := 3 - numMessages := 30 - numRotations := 3 - - expected := make([]string, 0, numFiles*numMessages*numRotations) - for i := 0; i < numFiles; i++ { - for j := 0; j < numMessages; j++ { - for k := 0; k < numRotations; k++ { - expected = append(expected, getMessage(i, k, j)) - } - } - } - - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) - defer operator.Stop() - - var wg sync.WaitGroup - for fileNum := 0; fileNum < numFiles; fileNum++ { - wg.Add(1) - go func(fn int) { - defer wg.Done() - - for rotationNum := 0; rotationNum < numRotations; rotationNum++ { - file := openFile(t, baseFileName(fn)) - for messageNum := 0; messageNum < numMessages; messageNum++ { - writeString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") - time.Sleep(5 * time.Millisecond) - } - - _, err := file.Seek(0, 0) - require.NoError(t, err) - dst := openFile(t, fileName(fn, rotationNum)) - _, err = io.Copy(dst, file) - require.NoError(t, err) - require.NoError(t, dst.Close()) - require.NoError(t, file.Truncate(0)) - _, err = file.Seek(0, 0) - require.NoError(t, err) - file.Close() - } - }(fileNum) - } - - waitForMessages(t, logReceived, expected) - wg.Wait() -} - -type rotationTest struct { - name string - totalLines int - maxLinesPerFile int - maxBackupFiles int - writeInterval time.Duration - pollInterval time.Duration - ephemeralLines bool -} - -/* - When log files are rotated at extreme speeds, it is possible to miss some log entries. - This can happen when an individual log entry is written and deleted within the duration - of a single poll interval. For example, consider the following scenario: - - A log file may have up to 9 backups (10 total log files) - - Each log file may contain up to 10 entries - - Log entries are written at an interval of 10µs - - Log files are polled at an interval of 100ms - In this scenario, a log entry that is written may only exist on disk for about 1ms. - A polling interval of 100ms will most likely never produce a chance to read the log file. - - In production settings, this consideration is not very likely to be a problem, but it is - easy to encounter the issue in tests, and difficult to deterministically simulate edge cases. - However, the above understanding does allow for some consistent expectations. - 1) Cases that do not require deletion of old log entries should always pass. - 2) Cases where the polling interval is sufficiently rapid should always pass. - 3) When neither 1 nor 2 is true, there may be missing entries, but still no duplicates. - - The following method is provided largely as documentation of how this is expected to behave. - In practice, timing is largely dependent on the responsiveness of system calls. -*/ -func (rt rotationTest) expectEphemeralLines() bool { - // primary + backups - maxLinesInAllFiles := rt.maxLinesPerFile + rt.maxLinesPerFile*rt.maxBackupFiles - - // Will the test write enough lines to result in deletion of oldest backups? - maxBackupsExceeded := rt.totalLines > maxLinesInAllFiles - - // last line written in primary file will exist for l*b more writes - minTimeToLive := time.Duration(int(rt.writeInterval) * rt.maxLinesPerFile * rt.maxBackupFiles) - - // can a line be written and then rotated to deletion before ever observed? - return maxBackupsExceeded && rt.pollInterval > minTimeToLive -} - -func (rt rotationTest) run(tc rotationTest, copyTruncate, sequential bool) func(t *testing.T) { - return func(t *testing.T) { - operator, logReceived, tempDir := newTestFileOperator(t, - func(cfg *InputConfig) { - cfg.PollInterval = helper.NewDuration(tc.pollInterval) - }, - func(out *testutil.FakeOutput) { - out.Received = make(chan *entry.Entry, tc.totalLines) - }, - ) - logger := getRotatingLogger(t, tempDir, tc.maxLinesPerFile, tc.maxBackupFiles, copyTruncate, sequential) - - expected := make([]string, 0, tc.totalLines) - baseStr := stringWithLength(46) // + ' 123' - for i := 0; i < tc.totalLines; i++ { - expected = append(expected, fmt.Sprintf("%s %3d", baseStr, i)) - } - - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) - defer operator.Stop() - - for _, message := range expected { - logger.Println(message) - time.Sleep(tc.writeInterval) - } - - received := make([]string, 0, tc.totalLines) - LOOP: - for { - select { - case e := <-logReceived: - received = append(received, e.Body.(string)) - case <-time.After(200 * time.Millisecond): - break LOOP - } - } - - if tc.ephemeralLines { - if !tc.expectEphemeralLines() { - // This is helpful for test development, and ensures the sample computation is used - t.Logf("Potentially unstable ephemerality expectation for test: %s", tc.name) - } - require.Subset(t, expected, received) - } else { - require.ElementsMatch(t, expected, received) - } - } -} - -func TestRotation(t *testing.T) { - cases := []rotationTest{ - { - name: "NoRotation", - totalLines: 10, - maxLinesPerFile: 10, - maxBackupFiles: 1, - writeInterval: time.Millisecond, - pollInterval: 10 * time.Millisecond, - }, - { - name: "NoDeletion", - totalLines: 20, - maxLinesPerFile: 10, - maxBackupFiles: 1, - writeInterval: time.Millisecond, - pollInterval: 10 * time.Millisecond, - }, - { - name: "Deletion", - totalLines: 30, - maxLinesPerFile: 10, - maxBackupFiles: 1, - writeInterval: time.Millisecond, - pollInterval: 10 * time.Millisecond, - ephemeralLines: true, - }, - { - name: "Deletion/ExceedFingerprint", - totalLines: 300, - maxLinesPerFile: 100, - maxBackupFiles: 1, - writeInterval: time.Millisecond, - pollInterval: 10 * time.Millisecond, - ephemeralLines: true, - }, - } - - for _, tc := range cases { - t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false)) - t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true)) - t.Run(fmt.Sprintf("%s/CopyTruncateTimestamped", tc.name), tc.run(tc, true, false)) - t.Run(fmt.Sprintf("%s/CopyTruncateSequential", tc.name), tc.run(tc, true, true)) - } -} - -func TestMoveFile(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Moving files while open is unsupported on Windows") - } - t.Parallel() - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - operator.persister = testutil.NewMockPersister("test") - - temp1 := openTemp(t, tempDir) - writeString(t, temp1, "testlog1\n") - temp1.Close() - - operator.poll(context.Background()) - defer operator.Stop() - - waitForMessage(t, logReceived, "testlog1") - - // Wait until all goroutines are finished before renaming - operator.wg.Wait() - err := os.Rename(temp1.Name(), fmt.Sprintf("%s.2", temp1.Name())) - require.NoError(t, err) - - operator.poll(context.Background()) - expectNoMessages(t, logReceived) -} - -// TruncateThenWrite tests that, after a file has been truncated, -// any new writes are picked up -func TestTruncateThenWrite(t *testing.T) { - t.Parallel() - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - operator.persister = testutil.NewMockPersister("test") - - temp1 := openTemp(t, tempDir) - writeString(t, temp1, "testlog1\ntestlog2\n") - - operator.poll(context.Background()) - defer operator.Stop() - - waitForMessage(t, logReceived, "testlog1") - waitForMessage(t, logReceived, "testlog2") - - require.NoError(t, temp1.Truncate(0)) - temp1.Seek(0, 0) - - writeString(t, temp1, "testlog3\n") - operator.poll(context.Background()) - waitForMessage(t, logReceived, "testlog3") - expectNoMessages(t, logReceived) -} - -// CopyTruncateWriteBoth tests that when a file is copied -// with unread logs on the end, then the original is truncated, -// we get the unread logs on the copy as well as any new logs -// written to the truncated file -func TestCopyTruncateWriteBoth(t *testing.T) { - t.Parallel() - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - operator.persister = testutil.NewMockPersister("test") - - temp1 := openTemp(t, tempDir) - writeString(t, temp1, "testlog1\ntestlog2\n") - - operator.poll(context.Background()) - defer operator.Stop() - - waitForMessage(t, logReceived, "testlog1") - waitForMessage(t, logReceived, "testlog2") - operator.wg.Wait() // wait for all goroutines to finish - - // Copy the first file to a new file, and add another log - temp2 := openTemp(t, tempDir) - _, err := io.Copy(temp2, temp1) - require.NoError(t, err) - - // Truncate original file - require.NoError(t, temp1.Truncate(0)) - temp1.Seek(0, 0) - - // Write to original and new file - writeString(t, temp2, "testlog3\n") - writeString(t, temp1, "testlog4\n") - - // Expect both messages to come through - operator.poll(context.Background()) - waitForMessages(t, logReceived, []string{"testlog3", "testlog4"}) -} - // OffsetsAfterRestart tests that a operator is able to load // its offsets after a restart func TestOffsetsAfterRestart(t *testing.T) { @@ -1015,38 +397,6 @@ func TestOffsetsAfterRestart_BigFilesWrittenWhileOff(t *testing.T) { waitForMessage(t, logReceived, log2) } -func TestFileMovedWhileOff_BigFiles(t *testing.T) { - t.Parallel() - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - persister := testutil.NewMockPersister("test") - - log1 := stringWithLength(1000) - log2 := stringWithLength(1000) - - temp := openTemp(t, tempDir) - writeString(t, temp, log1+"\n") - require.NoError(t, temp.Close()) - - // Start the operator - require.NoError(t, operator.Start(persister)) - defer operator.Stop() - waitForMessage(t, logReceived, log1) - - // Stop the operator, then rename and write a new log - require.NoError(t, operator.Stop()) - - err := os.Rename(temp.Name(), fmt.Sprintf("%s2", temp.Name())) - require.NoError(t, err) - - temp = reopenTemp(t, temp.Name()) - require.NoError(t, err) - writeString(t, temp, log2+"\n") - - // Expect the message written to the new log to come through - require.NoError(t, operator.Start(persister)) - waitForMessage(t, logReceived, log2) -} - func TestManyLogsDelivered(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) @@ -1157,75 +507,6 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { require.Equal(t, []byte("testlog1\n"), reader.Fingerprint.FirstBytes) } -func stringWithLength(length int) string { - charset := "abcdefghijklmnopqrstuvwxyz" - b := make([]byte, length) - for i := range b { - b[i] = charset[rand.Intn(len(charset))] - } - return string(b) -} - -func waitForOne(t *testing.T, c chan *entry.Entry) *entry.Entry { - select { - case e := <-c: - return e - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for message") - return nil - } -} - -func waitForN(t *testing.T, c chan *entry.Entry, n int) []string { - messages := make([]string, 0, n) - for i := 0; i < n; i++ { - select { - case e := <-c: - messages = append(messages, e.Body.(string)) - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for message") - return nil - } - } - return messages -} - -func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { - select { - case e := <-c: - require.Equal(t, expected, e.Body.(string)) - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for message", expected) - } -} - -func waitForMessages(t *testing.T, c chan *entry.Entry, expected []string) { - receivedMessages := make([]string, 0, len(expected)) -LOOP: - for { - select { - case e := <-c: - receivedMessages = append(receivedMessages, e.Body.(string)) - case <-time.After(time.Second): - break LOOP - } - } - - require.ElementsMatch(t, expected, receivedMessages) -} - -func expectNoMessages(t *testing.T, c chan *entry.Entry) { - expectNoMessagesUntil(t, c, 200*time.Millisecond) -} - -func expectNoMessagesUntil(t *testing.T, c chan *entry.Entry, d time.Duration) { - select { - case e := <-c: - require.FailNow(t, "Received unexpected message", "Message: %s", e.Body.(string)) - case <-time.After(d): - } -} - func TestEncodings(t *testing.T) { t.Parallel() cases := []struct { @@ -1311,64 +592,6 @@ func TestEncodings(t *testing.T) { } } -type fileInputBenchmark struct { - name string - config *InputConfig -} - -func BenchmarkFileInput(b *testing.B) { - cases := []fileInputBenchmark{ - { - "Default", - NewInputConfig("test_id"), - }, - { - "NoFileName", - func() *InputConfig { - cfg := NewInputConfig("test_id") - cfg.IncludeFileName = false - return cfg - }(), - }, - } - - for _, tc := range cases { - b.Run(tc.name, func(b *testing.B) { - tempDir := testutil.NewTempDir(b) - path := filepath.Join(tempDir, "in.log") - - cfg := tc.config - cfg.OutputIDs = []string{"fake"} - cfg.Include = []string{path} - cfg.StartAt = "beginning" - - ops, err := cfg.Build(testutil.NewBuildContext(b)) - require.NoError(b, err) - op := ops[0] - - fakeOutput := testutil.NewFakeOutput(b) - err = op.SetOutputs([]operator.Operator{fakeOutput}) - require.NoError(b, err) - - err = op.Start(testutil.NewMockPersister("test")) - defer op.Stop() - require.NoError(b, err) - - file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666) - require.NoError(b, err) - - for i := 0; i < b.N; i++ { - file.WriteString("testlog\n") - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - <-fakeOutput.Received - } - }) - } -} - // TestExclude tests that a log file will be excluded if it matches the // glob specified in the operator. func TestExclude(t *testing.T) { @@ -1411,14 +634,3 @@ func TestExcludeDuplicates(t *testing.T) { matches := getMatches(includes, excludes) require.ElementsMatch(t, matches, paths[2:3]) } - -// writes file with the specified file names and returns their full paths in order -func writeTempFiles(tempDir string, names []string) []string { - result := make([]string, 0, len(names)) - for _, name := range names { - path := filepath.Join(tempDir, name) - ioutil.WriteFile(path, []byte(name), 0755) - result = append(result, path) - } - return result -} diff --git a/operator/builtin/input/file/rotation_test.go b/operator/builtin/input/file/rotation_test.go new file mode 100644 index 00000000..72a43b28 --- /dev/null +++ b/operator/builtin/input/file/rotation_test.go @@ -0,0 +1,450 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" +) + +func TestMultiFileRotate(t *testing.T) { + t.Parallel() + + getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } + + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + + numFiles := 3 + numMessages := 3 + numRotations := 3 + + expected := make([]string, 0, numFiles*numMessages*numRotations) + for i := 0; i < numFiles; i++ { + for j := 0; j < numMessages; j++ { + for k := 0; k < numRotations; k++ { + expected = append(expected, getMessage(i, k, j)) + } + } + } + + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer operator.Stop() + + temps := make([]*os.File, 0, numFiles) + for i := 0; i < numFiles; i++ { + temps = append(temps, openTemp(t, tempDir)) + } + + var wg sync.WaitGroup + for i, temp := range temps { + wg.Add(1) + go func(tf *os.File, f int) { + defer wg.Done() + for k := 0; k < numRotations; k++ { + for j := 0; j < numMessages; j++ { + writeString(t, tf, getMessage(f, k, j)+"\n") + } + + require.NoError(t, tf.Close()) + require.NoError(t, os.Rename(tf.Name(), fmt.Sprintf("%s.%d", tf.Name(), k))) + tf = reopenTemp(t, tf.Name()) + } + }(temp, i) + } + + waitForMessages(t, logReceived, expected) + wg.Wait() +} + +func TestMultiFileRotateSlow(t *testing.T) { + if runtime.GOOS == "windows" { + // Windows has very poor support for moving active files, so rotation is less commonly used + // This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358 + t.Skip() + } + + t.Parallel() + + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + + getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } + fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) } + baseFileName := func(f int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.log", f)) } + + numFiles := 3 + numMessages := 30 + numRotations := 3 + + expected := make([]string, 0, numFiles*numMessages*numRotations) + for i := 0; i < numFiles; i++ { + for j := 0; j < numMessages; j++ { + for k := 0; k < numRotations; k++ { + expected = append(expected, getMessage(i, k, j)) + } + } + } + + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer operator.Stop() + + var wg sync.WaitGroup + for fileNum := 0; fileNum < numFiles; fileNum++ { + wg.Add(1) + go func(fn int) { + defer wg.Done() + + for rotationNum := 0; rotationNum < numRotations; rotationNum++ { + file := openFile(t, baseFileName(fn)) + for messageNum := 0; messageNum < numMessages; messageNum++ { + writeString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") + time.Sleep(5 * time.Millisecond) + } + + require.NoError(t, file.Close()) + require.NoError(t, os.Rename(baseFileName(fn), fileName(fn, rotationNum))) + } + }(fileNum) + } + + waitForMessages(t, logReceived, expected) + wg.Wait() +} + +func TestMultiCopyTruncateSlow(t *testing.T) { + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + + getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } + fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) } + baseFileName := func(f int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.log", f)) } + + numFiles := 3 + numMessages := 30 + numRotations := 3 + + expected := make([]string, 0, numFiles*numMessages*numRotations) + for i := 0; i < numFiles; i++ { + for j := 0; j < numMessages; j++ { + for k := 0; k < numRotations; k++ { + expected = append(expected, getMessage(i, k, j)) + } + } + } + + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer operator.Stop() + + var wg sync.WaitGroup + for fileNum := 0; fileNum < numFiles; fileNum++ { + wg.Add(1) + go func(fn int) { + defer wg.Done() + + for rotationNum := 0; rotationNum < numRotations; rotationNum++ { + file := openFile(t, baseFileName(fn)) + for messageNum := 0; messageNum < numMessages; messageNum++ { + writeString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") + time.Sleep(5 * time.Millisecond) + } + + _, err := file.Seek(0, 0) + require.NoError(t, err) + dst := openFile(t, fileName(fn, rotationNum)) + _, err = io.Copy(dst, file) + require.NoError(t, err) + require.NoError(t, dst.Close()) + require.NoError(t, file.Truncate(0)) + _, err = file.Seek(0, 0) + require.NoError(t, err) + file.Close() + } + }(fileNum) + } + + waitForMessages(t, logReceived, expected) + wg.Wait() +} + +type rotationTest struct { + name string + totalLines int + maxLinesPerFile int + maxBackupFiles int + writeInterval time.Duration + pollInterval time.Duration + ephemeralLines bool +} + +/* + When log files are rotated at extreme speeds, it is possible to miss some log entries. + This can happen when an individual log entry is written and deleted within the duration + of a single poll interval. For example, consider the following scenario: + - A log file may have up to 9 backups (10 total log files) + - Each log file may contain up to 10 entries + - Log entries are written at an interval of 10µs + - Log files are polled at an interval of 100ms + In this scenario, a log entry that is written may only exist on disk for about 1ms. + A polling interval of 100ms will most likely never produce a chance to read the log file. + + In production settings, this consideration is not very likely to be a problem, but it is + easy to encounter the issue in tests, and difficult to deterministically simulate edge cases. + However, the above understanding does allow for some consistent expectations. + 1) Cases that do not require deletion of old log entries should always pass. + 2) Cases where the polling interval is sufficiently rapid should always pass. + 3) When neither 1 nor 2 is true, there may be missing entries, but still no duplicates. + + The following method is provided largely as documentation of how this is expected to behave. + In practice, timing is largely dependent on the responsiveness of system calls. +*/ +func (rt rotationTest) expectEphemeralLines() bool { + // primary + backups + maxLinesInAllFiles := rt.maxLinesPerFile + rt.maxLinesPerFile*rt.maxBackupFiles + + // Will the test write enough lines to result in deletion of oldest backups? + maxBackupsExceeded := rt.totalLines > maxLinesInAllFiles + + // last line written in primary file will exist for l*b more writes + minTimeToLive := time.Duration(int(rt.writeInterval) * rt.maxLinesPerFile * rt.maxBackupFiles) + + // can a line be written and then rotated to deletion before ever observed? + return maxBackupsExceeded && rt.pollInterval > minTimeToLive +} + +func (rt rotationTest) run(tc rotationTest, copyTruncate, sequential bool) func(t *testing.T) { + return func(t *testing.T) { + operator, logReceived, tempDir := newTestFileOperator(t, + func(cfg *InputConfig) { + cfg.PollInterval = helper.NewDuration(tc.pollInterval) + }, + func(out *testutil.FakeOutput) { + out.Received = make(chan *entry.Entry, tc.totalLines) + }, + ) + logger := getRotatingLogger(t, tempDir, tc.maxLinesPerFile, tc.maxBackupFiles, copyTruncate, sequential) + + expected := make([]string, 0, tc.totalLines) + baseStr := stringWithLength(46) // + ' 123' + for i := 0; i < tc.totalLines; i++ { + expected = append(expected, fmt.Sprintf("%s %3d", baseStr, i)) + } + + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer operator.Stop() + + for _, message := range expected { + logger.Println(message) + time.Sleep(tc.writeInterval) + } + + received := make([]string, 0, tc.totalLines) + LOOP: + for { + select { + case e := <-logReceived: + received = append(received, e.Body.(string)) + case <-time.After(200 * time.Millisecond): + break LOOP + } + } + + if tc.ephemeralLines { + if !tc.expectEphemeralLines() { + // This is helpful for test development, and ensures the sample computation is used + t.Logf("Potentially unstable ephemerality expectation for test: %s", tc.name) + } + require.Subset(t, expected, received) + } else { + require.ElementsMatch(t, expected, received) + } + } +} + +func TestRotation(t *testing.T) { + cases := []rotationTest{ + { + name: "NoRotation", + totalLines: 10, + maxLinesPerFile: 10, + maxBackupFiles: 1, + writeInterval: time.Millisecond, + pollInterval: 10 * time.Millisecond, + }, + { + name: "NoDeletion", + totalLines: 20, + maxLinesPerFile: 10, + maxBackupFiles: 1, + writeInterval: time.Millisecond, + pollInterval: 10 * time.Millisecond, + }, + { + name: "Deletion", + totalLines: 30, + maxLinesPerFile: 10, + maxBackupFiles: 1, + writeInterval: time.Millisecond, + pollInterval: 10 * time.Millisecond, + ephemeralLines: true, + }, + { + name: "Deletion/ExceedFingerprint", + totalLines: 300, + maxLinesPerFile: 100, + maxBackupFiles: 1, + writeInterval: time.Millisecond, + pollInterval: 10 * time.Millisecond, + ephemeralLines: true, + }, + } + + for _, tc := range cases { + t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false)) + t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true)) + t.Run(fmt.Sprintf("%s/CopyTruncateTimestamped", tc.name), tc.run(tc, true, false)) + t.Run(fmt.Sprintf("%s/CopyTruncateSequential", tc.name), tc.run(tc, true, true)) + } +} + +func TestMoveFile(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Moving files while open is unsupported on Windows") + } + t.Parallel() + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + operator.persister = testutil.NewMockPersister("test") + + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\n") + temp1.Close() + + operator.poll(context.Background()) + defer operator.Stop() + + waitForMessage(t, logReceived, "testlog1") + + // Wait until all goroutines are finished before renaming + operator.wg.Wait() + err := os.Rename(temp1.Name(), fmt.Sprintf("%s.2", temp1.Name())) + require.NoError(t, err) + + operator.poll(context.Background()) + expectNoMessages(t, logReceived) +} + +// TruncateThenWrite tests that, after a file has been truncated, +// any new writes are picked up +func TestTruncateThenWrite(t *testing.T) { + t.Parallel() + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + operator.persister = testutil.NewMockPersister("test") + + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\ntestlog2\n") + + operator.poll(context.Background()) + defer operator.Stop() + + waitForMessage(t, logReceived, "testlog1") + waitForMessage(t, logReceived, "testlog2") + + require.NoError(t, temp1.Truncate(0)) + temp1.Seek(0, 0) + + writeString(t, temp1, "testlog3\n") + operator.poll(context.Background()) + waitForMessage(t, logReceived, "testlog3") + expectNoMessages(t, logReceived) +} + +// CopyTruncateWriteBoth tests that when a file is copied +// with unread logs on the end, then the original is truncated, +// we get the unread logs on the copy as well as any new logs +// written to the truncated file +func TestCopyTruncateWriteBoth(t *testing.T) { + t.Parallel() + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + operator.persister = testutil.NewMockPersister("test") + + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\ntestlog2\n") + + operator.poll(context.Background()) + defer operator.Stop() + + waitForMessage(t, logReceived, "testlog1") + waitForMessage(t, logReceived, "testlog2") + operator.wg.Wait() // wait for all goroutines to finish + + // Copy the first file to a new file, and add another log + temp2 := openTemp(t, tempDir) + _, err := io.Copy(temp2, temp1) + require.NoError(t, err) + + // Truncate original file + require.NoError(t, temp1.Truncate(0)) + temp1.Seek(0, 0) + + // Write to original and new file + writeString(t, temp2, "testlog3\n") + writeString(t, temp1, "testlog4\n") + + // Expect both messages to come through + operator.poll(context.Background()) + waitForMessages(t, logReceived, []string{"testlog3", "testlog4"}) +} + +func TestFileMovedWhileOff_BigFiles(t *testing.T) { + t.Parallel() + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + persister := testutil.NewMockPersister("test") + + log1 := stringWithLength(1000) + log2 := stringWithLength(1000) + + temp := openTemp(t, tempDir) + writeString(t, temp, log1+"\n") + require.NoError(t, temp.Close()) + + // Start the operator + require.NoError(t, operator.Start(persister)) + defer operator.Stop() + waitForMessage(t, logReceived, log1) + + // Stop the operator, then rename and write a new log + require.NoError(t, operator.Stop()) + + err := os.Rename(temp.Name(), fmt.Sprintf("%s2", temp.Name())) + require.NoError(t, err) + + temp = reopenTemp(t, temp.Name()) + require.NoError(t, err) + writeString(t, temp, log2+"\n") + + // Expect the message written to the new log to come through + require.NoError(t, operator.Start(persister)) + waitForMessage(t, logReceived, log2) +} diff --git a/operator/builtin/input/file/util_test.go b/operator/builtin/input/file/util_test.go new file mode 100644 index 00000000..37920f4d --- /dev/null +++ b/operator/builtin/input/file/util_test.go @@ -0,0 +1,190 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "fmt" + "io/ioutil" + "log" + "math/rand" + "os" + "path/filepath" + "testing" + "time" + + "github.com/observiq/nanojack" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/operator" + "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" +) + +func newDefaultConfig(tempDir string) *InputConfig { + cfg := NewInputConfig("testfile") + cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} + cfg.StartAt = "beginning" + cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)} + cfg.OutputIDs = []string{"fake"} + return cfg +} + +func newTestFileOperator(t *testing.T, cfgMod func(*InputConfig), outMod func(*testutil.FakeOutput)) (*InputOperator, chan *entry.Entry, string) { + fakeOutput := testutil.NewFakeOutput(t) + if outMod != nil { + outMod(fakeOutput) + } + + tempDir := testutil.NewTempDir(t) + + cfg := newDefaultConfig(tempDir) + if cfgMod != nil { + cfgMod(cfg) + } + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + op := ops[0] + + err = op.SetOutputs([]operator.Operator{fakeOutput}) + require.NoError(t, err) + + return op.(*InputOperator), fakeOutput.Received, tempDir +} + +func openFile(t testing.TB, path string) *os.File { + file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0777) + require.NoError(t, err) + t.Cleanup(func() { _ = file.Close() }) + return file +} + +func openTemp(t testing.TB, tempDir string) *os.File { + return openTempWithPattern(t, tempDir, "") +} + +func reopenTemp(t testing.TB, name string) *os.File { + return openTempWithPattern(t, filepath.Dir(name), filepath.Base(name)) +} + +func openTempWithPattern(t testing.TB, tempDir, pattern string) *os.File { + file, err := ioutil.TempFile(tempDir, pattern) + require.NoError(t, err) + t.Cleanup(func() { _ = file.Close() }) + return file +} + +func getRotatingLogger(t testing.TB, tempDir string, maxLines, maxBackups int, copyTruncate, sequential bool) *log.Logger { + file, err := ioutil.TempFile(tempDir, "") + require.NoError(t, err) + require.NoError(t, file.Close()) // will be managed by rotator + + rotator := nanojack.Logger{ + Filename: file.Name(), + MaxLines: maxLines, + MaxBackups: maxBackups, + CopyTruncate: copyTruncate, + Sequential: sequential, + } + + t.Cleanup(func() { _ = rotator.Close() }) + + return log.New(&rotator, "", 0) +} + +func writeString(t testing.TB, file *os.File, s string) { + _, err := file.WriteString(s) + require.NoError(t, err) +} + +func stringWithLength(length int) string { + charset := "abcdefghijklmnopqrstuvwxyz" + b := make([]byte, length) + for i := range b { + b[i] = charset[rand.Intn(len(charset))] + } + return string(b) +} + +func waitForOne(t *testing.T, c chan *entry.Entry) *entry.Entry { + select { + case e := <-c: + return e + case <-time.After(time.Second): + require.FailNow(t, "Timed out waiting for message") + return nil + } +} + +func waitForN(t *testing.T, c chan *entry.Entry, n int) []string { + messages := make([]string, 0, n) + for i := 0; i < n; i++ { + select { + case e := <-c: + messages = append(messages, e.Body.(string)) + case <-time.After(time.Second): + require.FailNow(t, "Timed out waiting for message") + return nil + } + } + return messages +} + +func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { + select { + case e := <-c: + require.Equal(t, expected, e.Body.(string)) + case <-time.After(time.Second): + require.FailNow(t, "Timed out waiting for message", expected) + } +} + +func waitForMessages(t *testing.T, c chan *entry.Entry, expected []string) { + receivedMessages := make([]string, 0, len(expected)) +LOOP: + for { + select { + case e := <-c: + receivedMessages = append(receivedMessages, e.Body.(string)) + case <-time.After(time.Second): + break LOOP + } + } + + require.ElementsMatch(t, expected, receivedMessages) +} + +func expectNoMessages(t *testing.T, c chan *entry.Entry) { + expectNoMessagesUntil(t, c, 200*time.Millisecond) +} + +func expectNoMessagesUntil(t *testing.T, c chan *entry.Entry, d time.Duration) { + select { + case e := <-c: + require.FailNow(t, "Received unexpected message", "Message: %s", e.Body.(string)) + case <-time.After(d): + } +} + +// writes file with the specified file names and returns their full paths in order +func writeTempFiles(tempDir string, names []string) []string { + result := make([]string, 0, len(names)) + for _, name := range names { + path := filepath.Join(tempDir, name) + ioutil.WriteFile(path, []byte(name), 0755) + result = append(result, path) + } + return result +} From 6c757ec173fb64cfadd0b3ac182f922af151c84a Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Wed, 2 Jun 2021 12:00:18 -0400 Subject: [PATCH 3/4] Refactor file_input benchmarking and add additional cases for notable scenarios (#166) --- Makefile | 2 +- operator/builtin/input/file/benchmark_test.go | 152 +++++++++++++++--- operator/builtin/input/file/config.go | 1 + operator/builtin/input/file/util_test.go | 8 +- testutil/util.go | 3 +- 5 files changed, 139 insertions(+), 27 deletions(-) diff --git a/Makefile b/Makefile index e4d22914..be7c4156 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ test-only: .PHONY: bench bench: - $(MAKE) for-all CMD="go test -run=NONE -bench '.*' ./... -benchmem" + go test -benchmem -run=^$$ -bench ^* ./... .PHONY: clean clean: diff --git a/operator/builtin/input/file/benchmark_test.go b/operator/builtin/input/file/benchmark_test.go index f579d17b..5df09e48 100644 --- a/operator/builtin/input/file/benchmark_test.go +++ b/operator/builtin/input/file/benchmark_test.go @@ -15,6 +15,7 @@ package file import ( + "io/ioutil" "os" "path/filepath" "testing" @@ -27,33 +28,132 @@ import ( type fileInputBenchmark struct { name string - config *InputConfig + paths []string + config func() *InputConfig +} + +type benchFile struct { + *os.File + log func(int) +} + +func simpleTextFile(file *os.File) *benchFile { + line := stringWithLength(49) + "\n" + return &benchFile{ + File: file, + log: func(_ int) { file.WriteString(line) }, + } } func BenchmarkFileInput(b *testing.B) { cases := []fileInputBenchmark{ { - "Default", - NewInputConfig("test_id"), + name: "Single", + paths: []string{ + "file0.log", + }, + config: func() *InputConfig { + cfg := NewInputConfig("test_id") + cfg.Include = []string{ + "file0.log", + } + return cfg + }, + }, + { + name: "Glob", + paths: []string{ + "file0.log", + "file1.log", + "file2.log", + "file3.log", + }, + config: func() *InputConfig { + cfg := NewInputConfig("test_id") + cfg.Include = []string{"file*.log"} + return cfg + }, + }, + { + name: "MultiGlob", + paths: []string{ + "file0.log", + "file1.log", + "log0.log", + "log1.log", + }, + config: func() *InputConfig { + cfg := NewInputConfig("test_id") + cfg.Include = []string{ + "file*.log", + "log*.log", + } + return cfg + }, + }, + { + name: "MaxConcurrent", + paths: []string{ + "file0.log", + "file1.log", + "file2.log", + "file3.log", + }, + config: func() *InputConfig { + cfg := NewInputConfig("test_id") + cfg.Include = []string{ + "file*.log", + } + cfg.MaxConcurrentFiles = 1 + return cfg + }, }, { - "NoFileName", - func() *InputConfig { + name: "FngrPrntLarge", + paths: []string{ + "file0.log", + }, + config: func() *InputConfig { cfg := NewInputConfig("test_id") - cfg.IncludeFileName = false + cfg.Include = []string{ + "file*.log", + } + cfg.FingerprintSize = 10 * defaultFingerprintSize return cfg - }(), + }, + }, + { + name: "FngrPrntSmall", + paths: []string{ + "file0.log", + }, + config: func() *InputConfig { + cfg := NewInputConfig("test_id") + cfg.Include = []string{ + "file*.log", + } + cfg.FingerprintSize = defaultFingerprintSize / 10 + return cfg + }, }, } - for _, tc := range cases { - b.Run(tc.name, func(b *testing.B) { - tempDir := testutil.NewTempDir(b) - path := filepath.Join(tempDir, "in.log") + for _, bench := range cases { + b.Run(bench.name, func(b *testing.B) { + rootDir, err := ioutil.TempDir("", "") + require.NoError(b, err) + + files := []*benchFile{} + for _, path := range bench.paths { + file := openFile(b, filepath.Join(rootDir, path)) + files = append(files, simpleTextFile(file)) + } - cfg := tc.config + cfg := bench.config() cfg.OutputIDs = []string{"fake"} - cfg.Include = []string{path} + for i, inc := range cfg.Include { + cfg.Include[i] = filepath.Join(rootDir, inc) + } cfg.StartAt = "beginning" ops, err := cfg.Build(testutil.NewBuildContext(b)) @@ -64,19 +164,29 @@ func BenchmarkFileInput(b *testing.B) { err = op.SetOutputs([]operator.Operator{fakeOutput}) require.NoError(b, err) + // write half the lines before starting + mid := b.N / 2 + for i := 0; i < mid; i++ { + for _, file := range files { + file.log(i) + } + } + + b.ResetTimer() err = op.Start(testutil.NewMockPersister("test")) defer op.Stop() require.NoError(b, err) - file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666) - require.NoError(b, err) - - for i := 0; i < b.N; i++ { - file.WriteString("testlog\n") - } + // write the remainder of lines while running + go func() { + for i := mid; i < b.N; i++ { + for _, file := range files { + file.log(i) + } + } + }() - b.ResetTimer() - for i := 0; i < b.N; i++ { + for i := 0; i < b.N*len(files); i++ { <-fakeOutput.Received } }) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index b806b3ac..27daaf16 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -42,6 +42,7 @@ func NewInputConfig(operatorID string) *InputConfig { IncludeFileName: true, IncludeFilePath: false, StartAt: "end", + FingerprintSize: defaultFingerprintSize, MaxLogSize: defaultMaxLogSize, MaxConcurrentFiles: defaultMaxConcurrentFiles, Encoding: helper.NewEncodingConfig(), diff --git a/operator/builtin/input/file/util_test.go b/operator/builtin/input/file/util_test.go index 37920f4d..eba5808f 100644 --- a/operator/builtin/input/file/util_test.go +++ b/operator/builtin/input/file/util_test.go @@ -64,10 +64,10 @@ func newTestFileOperator(t *testing.T, cfgMod func(*InputConfig), outMod func(*t return op.(*InputOperator), fakeOutput.Received, tempDir } -func openFile(t testing.TB, path string) *os.File { - file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0777) - require.NoError(t, err) - t.Cleanup(func() { _ = file.Close() }) +func openFile(tb testing.TB, path string) *os.File { + file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600) + require.NoError(tb, err) + tb.Cleanup(func() { _ = file.Close() }) return file } diff --git a/testutil/util.go b/testutil/util.go index 9e186eff..4078d975 100644 --- a/testutil/util.go +++ b/testutil/util.go @@ -22,6 +22,7 @@ import ( "sync" "testing" + "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" "github.com/open-telemetry/opentelemetry-log-collection/logger" @@ -46,7 +47,7 @@ func NewTempDir(t testing.TB) string { // NewBuildContext will return a new build context for testing func NewBuildContext(t testing.TB) operator.BuildContext { return operator.BuildContext{ - Logger: logger.New(zaptest.NewLogger(t).Sugar()), + Logger: logger.New(zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel)).Sugar()), Namespace: "$", } } From 8312a954fc467b6beab34656e324b3c3dc697336 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Tue, 8 Jun 2021 11:24:33 -0400 Subject: [PATCH 4/4] Add codecov automation (#167) * Add codecov automation * Rename codecov check * Add make test-coverage --- .github/codecov.yaml | 20 ++++++++++++++ .github/workflows/codecov.yml | 50 +++++++++++++++++++++++++++++++++++ .gitignore | 3 +++ Makefile | 21 ++++++++++++--- README.md | 3 +++ 5 files changed, 93 insertions(+), 4 deletions(-) create mode 100644 .github/codecov.yaml create mode 100644 .github/workflows/codecov.yml diff --git a/.github/codecov.yaml b/.github/codecov.yaml new file mode 100644 index 00000000..5960f02b --- /dev/null +++ b/.github/codecov.yaml @@ -0,0 +1,20 @@ +codecov: + require_ci_to_pass: yes + +ignore: + - "internal/tools/*" + +coverage: + precision: 1 + round: down + range: "70...100" + status: + project: + default: + target: auto + threshold: 0.5% + +comment: + layout: "reach,diff,flags,tree" + behavior: default + require_changes: yes \ No newline at end of file diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml new file mode 100644 index 00000000..6f67ca2a --- /dev/null +++ b/.github/workflows/codecov.yml @@ -0,0 +1,50 @@ +name: Run CodeCov +on: + push: + branches: + - main + pull_request: +env: + # Path to where test results will be saved. + TEST_RESULTS: /tmp/test-results + # Default minimum version of Go to support. + DEFAULT_GO_VERSION: 1.15 +jobs: + test-coverage: + runs-on: ubuntu-latest + steps: + - name: Install Go + uses: actions/setup-go@v2.1.3 + with: + go-version: ${{ env.DEFAULT_GO_VERSION }} + - name: Checkout Repo + uses: actions/checkout@v2 + - name: Setup Environment + run: | + echo "GOPATH=$(go env GOPATH)" >> $GITHUB_ENV + echo "$(go env GOPATH)/bin" >> $GITHUB_PATH + - name: Module cache + uses: actions/cache@v2.1.6 + env: + cache-name: go-mod-cache + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-${{ env.cache-name }}-${{ hashFiles('**/go.sum') }} + - name: Run coverage tests + run: | + make test-coverage + mkdir $TEST_RESULTS + cp coverage.out $TEST_RESULTS + cp coverage.txt $TEST_RESULTS + cp coverage.html $TEST_RESULTS + - name: Upload coverage report + uses: codecov/codecov-action@v1 + with: + file: ./coverage.txt + fail_ci_if_error: true + verbose: true + - name: Store coverage test output + uses: actions/upload-artifact@v2 + with: + name: opentelemetry-log-collection-test-output + path: ${{ env.TEST_RESULTS }} diff --git a/.gitignore b/.gitignore index 7d5a9674..f971b5d4 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ artifacts/* .vscode/* gen/ .idea/* +coverage.out +coverage.out.bak +coverage.txt.bak diff --git a/Makefile b/Makefile index be7c4156..5406bae9 100644 --- a/Makefile +++ b/Makefile @@ -4,10 +4,10 @@ GOARCH=$(shell go env GOARCH) GIT_SHA=$(shell git rev-parse --short HEAD) PROJECT_ROOT = $(shell pwd) -ARTIFACTS = ${PROJECT_ROOT}/artifacts ALL_MODULES := $(shell find . -type f -name "go.mod" -exec dirname {} \; | sort ) ALL_SRC := $(shell find . -name '*.go' -type f | sort) ADDLICENSE=addlicense +ALL_COVERAGE_MOD_DIRS := $(shell find . -type f -name 'go.mod' -exec dirname {} \; | egrep -v '^./internal/tools' | sort) TOOLS_MOD_DIR := ./internal/tools .PHONY: install-tools @@ -23,14 +23,27 @@ test: vet test-only test-only: $(MAKE) for-all CMD="go test -race -coverprofile coverage.txt -coverpkg ./... ./..." +.PHONY: test-coverage +test-coverage: clean + @set -e; \ + printf "" > coverage.txt; \ + for dir in $(ALL_COVERAGE_MOD_DIRS); do \ + (cd "$${dir}" && \ + go list ./... \ + | grep -v third_party \ + | xargs go test -coverpkg=./... -covermode=atomic -coverprofile=coverage.out && \ + go tool cover -html=coverage.out -o coverage.html); \ + [ -f "$${dir}/coverage.out" ] && cat "$${dir}/coverage.out" >> coverage.txt; \ + done; \ + sed -i.bak -e '2,$$ { /^mode: /d; }' coverage.txt + .PHONY: bench bench: go test -benchmem -run=^$$ -bench ^* ./... .PHONY: clean clean: - rm -fr ./artifacts - $(MAKE) for-all CMD="rm -f coverage.txt coverage.html" + $(MAKE) for-all CMD="rm -f coverage.txt.* coverage.html coverage.out" .PHONY: tidy tidy: @@ -91,4 +104,4 @@ for-all: done .PHONY: ci-check -ci-check: vet lint check-license \ No newline at end of file +ci-check: vet lint check-license diff --git a/README.md b/README.md index 1f1dd2cf..13c5014d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,7 @@ # opentelemetry-log-collection + +[![codecov.io](https://codecov.io/gh/open-telemetry/opentelemetry-log-collection/coverage.svg?branch=main)](https://app.codecov.io/gh/open-telemetry/opentelemetry-log-collection?branch=main) + ## Status This project was originally developed by [observIQ](https://observiq.com/) under the name [Stanza](https://github.com/observIQ/stanza). It has been contributed to the OpenTelemetry project in order to accelerate development of the collector's log collection capabilities.