Skip to content

Commit

Permalink
Fix default output (open-telemetry#186)
Browse files Browse the repository at this point in the history
* Fix defaultoutput

* Implement PR feedback
  • Loading branch information
Mrod1598 authored Jun 15, 2021
1 parent cc61797 commit 44b6bf5
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 18 deletions.
4 changes: 4 additions & 0 deletions agent/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (b *LogAgentBuilder) Build() (*LogAgent, error) {
b.config = cfgs
}

if len(b.config.Pipeline) == 0 {
return nil, errors.NewError("empty pipeline not allowed", "")
}

sampledLogger := b.logger.Desugar().WithOptions(
zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSamplerWithOptions(core, time.Second, 1, 10000)
Expand Down
63 changes: 59 additions & 4 deletions agent/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,41 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/noop"
"github.com/open-telemetry/opentelemetry-log-collection/testutil"
)

func TestBuildAgentSuccess(t *testing.T) {
mockCfg := Config{}
mockCfg := Config{
[]operator.Config{
{
Builder: noop.NewNoopOperatorConfig("noop"),
},
},
}
mockLogger := zap.NewNop().Sugar()
mockPluginDir := "/some/path/plugins"

agent, err := NewBuilder(mockLogger).
WithConfig(&mockCfg).
WithPluginDir(mockPluginDir).
Build()
require.NoError(t, err)
require.Equal(t, mockLogger, agent.SugaredLogger)
}

func TestBuildAgentDefaultOperator(t *testing.T) {
mockCfg := Config{
[]operator.Config{
{
Builder: noop.NewNoopOperatorConfig("noop"),
},
{
Builder: noop.NewNoopOperatorConfig("noop1"),
},
},
}
mockLogger := zap.NewNop().Sugar()
mockPluginDir := "/some/path/plugins"
mockOutput := testutil.NewFakeOutput(t)
Expand All @@ -36,20 +66,45 @@ func TestBuildAgentSuccess(t *testing.T) {
Build()
require.NoError(t, err)
require.Equal(t, mockLogger, agent.SugaredLogger)

ops := agent.pipeline.Operators()
require.Equal(t, 3, len(ops))

exists := make(map[string]bool)

for _, op := range ops {
switch op.ID() {
case "$.noop":
require.Equal(t, 1, len(op.GetOutputIDs()))
require.Equal(t, "$.noop1", op.GetOutputIDs()[0])
exists["$.noop"] = true
case "$.noop1":
require.Equal(t, 1, len(op.GetOutputIDs()))
require.Equal(t, "$.fake", op.GetOutputIDs()[0])
exists["$.noop1"] = true
case "$.fake":
require.Equal(t, 0, len(op.GetOutputIDs()))
exists["$.fake"] = true
}
}
require.True(t, exists["$.noop"])
require.True(t, exists["$.noop1"])
require.True(t, exists["$.fake"])
}

func TestBuildAgentFailureOnPluginRegistry(t *testing.T) {
mockCfg := Config{}
mockLogger := zap.NewNop().Sugar()
mockPluginDir := "[]"
mockOutput := testutil.NewFakeOutput(t)

_, err := NewBuilder(mockLogger).
agent, err := NewBuilder(mockLogger).
WithConfig(&mockCfg).
WithPluginDir(mockPluginDir).
WithDefaultOutput(mockOutput).
Build()
require.NoError(t, err)
require.Error(t, err)
require.Contains(t, err.Error(), "empty pipeline not allowed")
require.Nil(t, agent)
}

func TestBuildAgentFailureNoConfigOrGlobs(t *testing.T) {
Expand Down
17 changes: 7 additions & 10 deletions pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
type Config []operator.Config

// BuildOperators builds the operators from the list of configs into operators.
func (c Config) BuildOperators(bc operator.BuildContext) ([]operator.Operator, error) {
func (c Config) BuildOperators(bc operator.BuildContext, defaultOperator operator.Operator) ([]operator.Operator, error) {
// buildsMulti's key represents an operator's ID that builds multiple operators, e.g. Plugins.
// the value is the plugin's first operator's ID.
buildsMulti := make(map[string]string)
Expand All @@ -38,6 +38,11 @@ func (c Config) BuildOperators(bc operator.BuildContext) ([]operator.Operator, e
}
operators = append(operators, op...)
}

if defaultOperator != nil && operators[len(operators)-1].CanOutput() {
operators = append(operators, defaultOperator)
}

if err := SetOutputIDs(operators, buildsMulti); err != nil {
return nil, err
}
Expand All @@ -47,19 +52,11 @@ func (c Config) BuildOperators(bc operator.BuildContext) ([]operator.Operator, e

// BuildPipeline will build a pipeline from the config.
func (c Config) BuildPipeline(bc operator.BuildContext, defaultOperator operator.Operator) (*DirectedPipeline, error) {
if defaultOperator != nil {
bc.DefaultOutputIDs = []string{defaultOperator.ID()}
}

operators, err := c.BuildOperators(bc)
operators, err := c.BuildOperators(bc, defaultOperator)
if err != nil {
return nil, err
}

if defaultOperator != nil {
operators = append(operators, defaultOperator)
}

return NewDirectedPipeline(operators)
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *Config) Build(bc operator.BuildContext) ([]operator.Operator, error) {
}

nbc := bc.WithSubNamespace(c.ID()).WithIncrementedDepth()
return pipelineConfig.Pipeline.BuildOperators(nbc)
return pipelineConfig.Pipeline.BuildOperators(nbc, nil)
}

func (c *Config) BuildsMultipleOps() bool { return true }
Expand Down
6 changes: 3 additions & 3 deletions plugin/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pipeline:
}

for _, tc := range cases {
ops, err := tc.PluginConfig.BuildOperators(testutil.NewBuildContext(t))
ops, err := tc.PluginConfig.BuildOperators(testutil.NewBuildContext(t), nil)
require.NoError(t, err)

require.Len(t, ops, len(tc.ExpectedOpIDs))
Expand Down Expand Up @@ -553,7 +553,7 @@ pipeline:

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
ops, err := tc.PluginConfig.BuildOperators(testutil.NewBuildContext(t))
ops, err := tc.PluginConfig.BuildOperators(testutil.NewBuildContext(t), nil)
require.NoError(t, err)

for i, op := range ops {
Expand Down Expand Up @@ -595,7 +595,7 @@ pipeline:
err = yaml.Unmarshal(pipelineConfig, &pipeline)
require.NoError(t, err)

_, err = pipeline.BuildOperators(operator.NewBuildContext(zaptest.NewLogger(t).Sugar()))
_, err = pipeline.BuildOperators(operator.NewBuildContext(zaptest.NewLogger(t).Sugar()), nil)
require.Error(t, err)
require.Contains(t, err.Error(), "reached max plugin depth")
}

0 comments on commit 44b6bf5

Please sign in to comment.