Skip to content

Commit

Permalink
Increased test coverage for the plugin package
Browse files Browse the repository at this point in the history
  • Loading branch information
jmwilliams89 committed Sep 8, 2020
1 parent e19f879 commit 8c4ab22
Show file tree
Hide file tree
Showing 10 changed files with 242 additions and 93 deletions.
10 changes: 5 additions & 5 deletions agent/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/observiq/stanza/database"
"github.com/observiq/stanza/errors"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/plugin"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -49,18 +50,17 @@ func (b *LogAgentBuilder) Build() (*LogAgent, error) {
return nil, errors.Wrap(err, "open database")
}

registry, err := operator.NewPluginRegistry(b.pluginDir)
registry, err := plugin.NewPluginRegistry(b.pluginDir)
if err != nil {
return nil, errors.Wrap(err, "load plugin registry")
}

buildContext := operator.BuildContext{
Logger: b.logger,
PluginRegistry: registry,
Database: db,
Logger: b.logger,
Database: db,
}

pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput)
pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, registry, b.defaultOutput)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions operator/builtin/input/generate/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
"github.com/observiq/stanza/plugin"
"github.com/observiq/stanza/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -66,7 +67,7 @@ pipeline:
tmpl, err := template.New("my_generator").Parse(templateText)
require.NoError(t, err)

registry := operator.PluginRegistry{
registry := plugin.Registry{
"sample": tmpl,
}

Expand All @@ -76,7 +77,7 @@ pipeline:
config, err := registry.Render("sample", params)
require.NoError(t, err)

expectedConfig := operator.PluginConfig{
expectedConfig := plugin.Plugin{
Pipeline: []operator.Config{
{
Builder: &GenerateInputConfig{
Expand Down
7 changes: 3 additions & 4 deletions operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ type Builder interface {

// BuildContext supplies contextual resources when building an operator.
type BuildContext struct {
PluginRegistry PluginRegistry
Database database.Database
Parameters map[string]interface{}
Logger *zap.SugaredLogger
Database database.Database
Parameters map[string]interface{}
Logger *zap.SugaredLogger
}

// registry is a global registry of operator types to operator builders.
Expand Down
11 changes: 6 additions & 5 deletions pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
"github.com/observiq/stanza/errors"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
"github.com/observiq/stanza/plugin"
yaml "gopkg.in/yaml.v2"
)

// Config is the configuration of a pipeline.
type Config []Params

// BuildPipeline will build a pipeline from the config.
func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput operator.Operator) (*DirectedPipeline, error) {
operatorConfigs, err := c.buildOperatorConfigs(context.PluginRegistry)
func (c Config) BuildPipeline(context operator.BuildContext, pluginRegistry plugin.Registry, defaultOutput operator.Operator) (*DirectedPipeline, error) {
operatorConfigs, err := c.buildOperatorConfigs(pluginRegistry)
if err != nil {
return nil, err
}
Expand All @@ -37,7 +38,7 @@ func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput opera
return pipeline, nil
}

func (c Config) buildOperatorConfigs(pluginRegistry operator.PluginRegistry) ([]operator.Config, error) {
func (c Config) buildOperatorConfigs(pluginRegistry plugin.Registry) ([]operator.Config, error) {
operatorConfigs := make([]operator.Config, 0, len(c))

for i, params := range c {
Expand Down Expand Up @@ -194,7 +195,7 @@ func (p Params) getStringArray(key string) []string {
}

// BuildConfigs will build operator configs from a params map.
func (p Params) BuildConfigs(pluginRegistry operator.PluginRegistry, namespace string, defaultOutput []string) ([]operator.Config, error) {
func (p Params) BuildConfigs(pluginRegistry plugin.Registry, namespace string, defaultOutput []string) ([]operator.Config, error) {
if operator.IsDefined(p.Type()) {
return p.buildAsBuiltin(namespace)
}
Expand Down Expand Up @@ -232,7 +233,7 @@ func (p Params) buildAsBuiltin(namespace string) ([]operator.Config, error) {
}

// buildPlugin will build a plugin config from a params map.
func (p Params) buildPlugin(pluginRegistry operator.PluginRegistry, namespace string, defaultOutput []string) ([]operator.Config, error) {
func (p Params) buildPlugin(pluginRegistry plugin.Registry, namespace string, defaultOutput []string) ([]operator.Config, error) {
templateParams := map[string]interface{}{}
for key, value := range p {
templateParams[key] = value
Expand Down
35 changes: 20 additions & 15 deletions pipeline/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "github.com/observiq/stanza/operator/builtin/input/generate"
"github.com/observiq/stanza/operator/builtin/output/drop"
_ "github.com/observiq/stanza/operator/builtin/transformer/noop"
"github.com/observiq/stanza/plugin"
"github.com/observiq/stanza/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -201,7 +202,8 @@ pipeline:
message: test
output: {{.output}}
`
err := context.PluginRegistry.Add("plugin", pluginTemplate)
registry := plugin.Registry{}
err := registry.Add("plugin", pluginTemplate)
require.NoError(t, err)

pipelineConfig := Config{
Expand All @@ -216,7 +218,7 @@ pipeline:
},
}

_, err = pipelineConfig.BuildPipeline(context, nil)
_, err = pipelineConfig.BuildPipeline(context, registry, nil)
require.NoError(t, err)
}

Expand All @@ -239,7 +241,7 @@ func TestBuildValidPipelineDefaultOutput(t *testing.T) {
defaultOutput, err := drop.NewDropOutputConfig("$.drop_it").Build(context)
require.NoError(t, err)

pl, err := pipelineConfig.BuildPipeline(context, defaultOutput)
pl, err := pipelineConfig.BuildPipeline(context, nil, defaultOutput)
require.NoError(t, err)
require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.generate_input"), createNodeID("$.drop_it")))
}
Expand Down Expand Up @@ -267,7 +269,7 @@ func TestBuildValidPipelineNextOutputAndDefaultOutput(t *testing.T) {
defaultOutput, err := drop.NewDropOutputConfig("$.drop_it").Build(context)
require.NoError(t, err)

pl, err := pipelineConfig.BuildPipeline(context, defaultOutput)
pl, err := pipelineConfig.BuildPipeline(context, nil, defaultOutput)
require.NoError(t, err)
require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.generate_input"), createNodeID("$.noop")))
require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.noop"), createNodeID("$.drop_it")))
Expand All @@ -284,7 +286,8 @@ pipeline:
record:
message: test
`
err := context.PluginRegistry.Add("plugin", pluginTemplate)
registry := plugin.Registry{}
err := registry.Add("plugin", pluginTemplate)
require.NoError(t, err)

pipelineConfig := Config{
Expand All @@ -297,7 +300,7 @@ pipeline:
defaultOutput, err := drop.NewDropOutputConfig("$.drop_it").Build(context)
require.NoError(t, err)

pl, err := pipelineConfig.BuildPipeline(context, defaultOutput)
pl, err := pipelineConfig.BuildPipeline(context, registry, defaultOutput)
require.NoError(t, err)
require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.plugin.plugin_generate"), createNodeID("$.drop_it")))
}
Expand All @@ -317,7 +320,7 @@ func TestBuildInvalidPipelineInvalidType(t *testing.T) {
},
}

_, err := pipelineConfig.BuildPipeline(context, nil)
_, err := pipelineConfig.BuildPipeline(context, nil, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "unsupported `type` for operator config")
}
Expand All @@ -333,7 +336,8 @@ pipeline:
message: test
output: {{.output}}
`
err := context.PluginRegistry.Add("plugin", pluginTemplate)
registry := plugin.Registry{}
err := registry.Add("plugin", pluginTemplate)
require.NoError(t, err)

pipelineConfig := Config{
Expand All @@ -348,7 +352,7 @@ pipeline:
},
}

_, err = pipelineConfig.BuildPipeline(context, nil)
_, err = pipelineConfig.BuildPipeline(context, registry, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "build operator configs")
}
Expand All @@ -368,7 +372,7 @@ func TestBuildInvalidPipelineInvalidOperator(t *testing.T) {
}

context := testutil.NewBuildContext(t)
_, err := pipelineConfig.BuildPipeline(context, nil)
_, err := pipelineConfig.BuildPipeline(context, nil, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "field number not found")
}
Expand All @@ -393,7 +397,7 @@ func TestBuildInvalidPipelineInvalidGraph(t *testing.T) {
}

context := testutil.NewBuildContext(t)
_, err := pipelineConfig.BuildPipeline(context, nil)
_, err := pipelineConfig.BuildPipeline(context, nil, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "does not exist")
}
Expand All @@ -413,7 +417,8 @@ pipeline:
record: test
output: {{.output}}
`
err := context.PluginRegistry.Add("plugin", pluginTemplate)
registry := plugin.Registry{}
err := registry.Add("plugin", pluginTemplate)
require.NoError(t, err)

config := Config{
Expand All @@ -427,7 +432,7 @@ pipeline:
},
}

configs, err := config.buildOperatorConfigs(context.PluginRegistry)
configs, err := config.buildOperatorConfigs(registry)
require.NoError(t, err)
require.Len(t, configs, 3)

Expand Down Expand Up @@ -526,7 +531,7 @@ func TestBuildPipelineWithFailingOperator(t *testing.T) {
config := Config{
{"type": "invalid_operator"},
}
_, err := config.BuildPipeline(ctx, nil)
_, err := config.BuildPipeline(ctx, nil, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to build operator")
}
Expand All @@ -536,7 +541,7 @@ func TestBuildPipelineWithInvalidParam(t *testing.T) {
config := Config{
{"missing": "type"},
}
_, err := config.BuildPipeline(ctx, nil)
_, err := config.BuildPipeline(ctx, nil, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "missing required `type` field")
}
Expand Down
24 changes: 12 additions & 12 deletions operator/plugin_parameter.go → plugin/parameter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package operator
package plugin

import (
"fmt"

"github.com/observiq/stanza/errors"
)

// PluginParameter is a basic description of a plugin's parameter.
type PluginParameter struct {
// Parameter is a basic description of a plugin's parameter.
type Parameter struct {
Label string
Description string
Required bool
Expand All @@ -16,7 +16,7 @@ type PluginParameter struct {
Default interface{} // Must be valid according to Type & ValidValues
}

func (param PluginParameter) validate() error {
func (param Parameter) validate() error {
if param.Required && param.Default != nil {
return errors.NewError(
"required parameter cannot have a default value",
Expand All @@ -39,7 +39,7 @@ func (param PluginParameter) validate() error {
return nil
}

func (param PluginParameter) validateType() error {
func (param Parameter) validateType() error {
switch param.Type {
case "string", "int", "bool", "strings", "enum": // ok
default:
Expand All @@ -51,7 +51,7 @@ func (param PluginParameter) validateType() error {
return nil
}

func (param PluginParameter) validateValidValues() error {
func (param Parameter) validateValidValues() error {
switch param.Type {
case "string", "int", "bool", "strings":
if len(param.ValidValues) > 0 {
Expand All @@ -71,7 +71,7 @@ func (param PluginParameter) validateValidValues() error {
return nil
}

func (param PluginParameter) validateDefault() error {
func (param Parameter) validateDefault() error {
if param.Default == nil {
return nil
}
Expand All @@ -96,7 +96,7 @@ func (param PluginParameter) validateDefault() error {
}
}

func validateStringDefault(param PluginParameter) error {
func validateStringDefault(param Parameter) error {
if _, ok := param.Default.(string); !ok {
return errors.NewError(
"default value for a parameter of type 'string' must be a string",
Expand All @@ -106,7 +106,7 @@ func validateStringDefault(param PluginParameter) error {
return nil
}

func validateIntDefault(param PluginParameter) error {
func validateIntDefault(param Parameter) error {
switch param.Default.(type) {
case int, int32, int64:
return nil
Expand All @@ -118,7 +118,7 @@ func validateIntDefault(param PluginParameter) error {
}
}

func validateBoolDefault(param PluginParameter) error {
func validateBoolDefault(param Parameter) error {
if _, ok := param.Default.(bool); !ok {
return errors.NewError(
"default value for a parameter of type 'bool' must be a boolean",
Expand All @@ -128,7 +128,7 @@ func validateBoolDefault(param PluginParameter) error {
return nil
}

func validateStringArrayDefault(param PluginParameter) error {
func validateStringArrayDefault(param Parameter) error {
defaultList, ok := param.Default.([]interface{})
if !ok {
return errors.NewError(
Expand All @@ -147,7 +147,7 @@ func validateStringArrayDefault(param PluginParameter) error {
return nil
}

func validateEnumDefault(param PluginParameter) error {
func validateEnumDefault(param Parameter) error {
def, ok := param.Default.(string)
if !ok {
return errors.NewError(
Expand Down
Loading

0 comments on commit 8c4ab22

Please sign in to comment.