From 16358e3e828e8d455b4f962da029569153e9a6f5 Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Mon, 7 Sep 2020 15:20:02 -0400 Subject: [PATCH 01/13] Completed test coverage for entry and agent packages --- agent/agent.go | 112 +++------------------- agent/agent_test.go | 119 +++++++++++++++-------- agent/builder.go | 72 ++++++++++++++ agent/builder_test.go | 85 +++++++++++++++++ agent/config_test.go | 114 ++++++++++++++++++++++ cmd/stanza/graph.go | 2 +- cmd/stanza/offsets.go | 6 +- cmd/stanza/service.go | 11 ++- entry/entry_test.go | 41 ++++++++ entry/field_test.go | 37 ++++++++ entry/nil_field_test.go | 37 ++++++++ entry/record_field.go | 6 +- entry/record_field_test.go | 82 +++++++++++++++- operator/config.go | 31 ------ operator/database.go | 63 ++++++++++++ operator/database_test.go | 38 ++++++++ pipeline/config.go | 4 +- pipeline/directed.go | 190 +++++++++++++++++++++++++++++++++++++ pipeline/pipeline.go | 189 ++---------------------------------- pipeline/pipeline_test.go | 12 +-- testutil/database.go | 69 ++++++++++++++ testutil/pipeline.go | 75 +++++++++++++++ 22 files changed, 1028 insertions(+), 367 deletions(-) create mode 100644 agent/builder.go create mode 100644 agent/builder_test.go create mode 100644 agent/config_test.go create mode 100644 entry/nil_field_test.go create mode 100644 operator/database.go create mode 100644 operator/database_test.go create mode 100644 pipeline/directed.go create mode 100644 testutil/database.go create mode 100644 testutil/pipeline.go diff --git a/agent/agent.go b/agent/agent.go index 5f243c62b..6ca296814 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1,23 +1,17 @@ package agent import ( - "fmt" - "os" - "path/filepath" "sync" - "time" - "github.com/observiq/stanza/errors" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/pipeline" - "go.etcd.io/bbolt" "go.uber.org/zap" ) // LogAgent is an entity that handles log monitoring. type LogAgent struct { database operator.Database - pipeline *pipeline.Pipeline + pipeline pipeline.Pipeline startOnce sync.Once stopOnce sync.Once @@ -25,109 +19,29 @@ type LogAgent struct { *zap.SugaredLogger } -// Start will start the log monitoring process. +// Start will start the log monitoring process func (a *LogAgent) Start() (err error) { a.startOnce.Do(func() { err = a.pipeline.Start() if err != nil { return } - a.Info("Agent started") }) return } -// Stop will stop the log monitoring process. -func (a *LogAgent) Stop() { +// Stop will stop the log monitoring process +func (a *LogAgent) Stop() (err error) { a.stopOnce.Do(func() { - a.pipeline.Stop() - a.database.Close() - a.Info("Agent stopped") - }) -} - -// OpenDatabase will open and create a database. -func OpenDatabase(file string) (operator.Database, error) { - if file == "" { - return operator.NewStubDatabase(), nil - } - - if _, err := os.Stat(filepath.Dir(file)); err != nil { - if os.IsNotExist(err) { - err := os.MkdirAll(filepath.Dir(file), 0755) - if err != nil { - return nil, fmt.Errorf("creating database directory: %s", err) - } - } else { - return nil, err + err = a.pipeline.Stop() + if err != nil { + return } - } - - options := &bbolt.Options{Timeout: 1 * time.Second} - return bbolt.Open(file, 0666, options) -} - -// LogAgentBuilder is a construct used to build a log agent -type LogAgentBuilder struct { - cfg *Config - logger *zap.SugaredLogger - pluginDir string - databaseFile string - defaultOutput operator.Operator -} - -// NewBuilder creates a new LogAgentBuilder -func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder { - return &LogAgentBuilder{ - cfg: cfg, - logger: logger, - } -} - -// WithPluginDir adds the specified plugin directory when building a log agent -func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder { - b.pluginDir = pluginDir - return b -} -// WithDatabaseFile adds the specified database file when building a log agent -func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder { - b.databaseFile = databaseFile - return b -} - -// WithDefaultOutput adds a default output when building a log agent -func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder { - b.defaultOutput = defaultOutput - return b -} - -// Build will build a new log agent using the values defined on the builder -func (b *LogAgentBuilder) Build() (*LogAgent, error) { - database, err := OpenDatabase(b.databaseFile) - if err != nil { - return nil, errors.Wrap(err, "open database") - } - - registry, err := operator.NewPluginRegistry(b.pluginDir) - if err != nil { - return nil, errors.Wrap(err, "load plugin registry") - } - - buildContext := operator.BuildContext{ - Logger: b.logger, - PluginRegistry: registry, - Database: database, - } - - pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput) - if err != nil { - return nil, err - } - - return &LogAgent{ - pipeline: pipeline, - database: database, - SugaredLogger: b.logger, - }, nil + err = a.database.Close() + if err != nil { + return + } + }) + return } diff --git a/agent/agent_test.go b/agent/agent_test.go index 925b764c8..c8e536462 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1,9 +1,7 @@ package agent import ( - "os" - "path/filepath" - "runtime" + "fmt" "testing" "github.com/observiq/stanza/testutil" @@ -11,44 +9,87 @@ import ( "go.uber.org/zap" ) -func TestNewLogAgent(t *testing.T) { - mockCfg := Config{} - mockLogger := zap.NewNop().Sugar() - mockPluginDir := "/some/path/plugins" - mockDatabaseFile := "" - agent, err := NewBuilder(&mockCfg, mockLogger). - WithPluginDir(mockPluginDir). - WithDatabaseFile(mockDatabaseFile). - Build() +func TestStartAgentSuccess(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + pipeline.On("Start").Return(nil) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + } + err := agent.Start() require.NoError(t, err) + pipeline.AssertCalled(t, "Start") +} + +func TestStartAgentFailure(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + failure := fmt.Errorf("failed to start pipeline") + pipeline.On("Start").Return(failure) - require.Equal(t, mockLogger, agent.SugaredLogger) + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + } + err := agent.Start() + require.Error(t, err, failure) + pipeline.AssertCalled(t, "Start") } -func TestOpenDatabase(t *testing.T) { - t.Run("Simple", func(t *testing.T) { - tempDir := testutil.NewTempDir(t) - db, err := OpenDatabase(filepath.Join(tempDir, "test.db")) - require.NoError(t, err) - require.NotNil(t, db) - }) - - t.Run("NonexistantPathIsCreated", func(t *testing.T) { - tempDir := testutil.NewTempDir(t) - db, err := OpenDatabase(filepath.Join(tempDir, "nonexistdir", "test.db")) - require.NoError(t, err) - require.NotNil(t, db) - }) - - t.Run("BadPermissions", func(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Windows does not have the same kind of file permissions") - } - tempDir := testutil.NewTempDir(t) - err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0666) - require.NoError(t, err) - db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) - require.Error(t, err) - require.Nil(t, db) - }) +func TestStopAgentSuccess(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + pipeline.On("Stop").Return(nil) + database := &testutil.Database{} + database.On("Close").Return(nil) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + database: database, + } + err := agent.Stop() + require.NoError(t, err) + pipeline.AssertCalled(t, "Stop") + database.AssertCalled(t, "Close") +} + +func TestStopAgentPipelineFailure(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + failure := fmt.Errorf("failed to start pipeline") + pipeline.On("Stop").Return(failure) + database := &testutil.Database{} + database.On("Close").Return(nil) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + database: database, + } + err := agent.Stop() + require.Error(t, err, failure) + pipeline.AssertCalled(t, "Stop") + database.AssertNotCalled(t, "Close") +} + +func TestStopAgentDatabaseFailure(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + pipeline.On("Stop").Return(nil) + database := &testutil.Database{} + failure := fmt.Errorf("failed to close database") + database.On("Close").Return(failure) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + database: database, + } + err := agent.Stop() + require.Error(t, err, failure) + pipeline.AssertCalled(t, "Stop") + database.AssertCalled(t, "Close") } diff --git a/agent/builder.go b/agent/builder.go new file mode 100644 index 000000000..8938f9dac --- /dev/null +++ b/agent/builder.go @@ -0,0 +1,72 @@ +package agent + +import ( + "github.com/observiq/stanza/errors" + "github.com/observiq/stanza/operator" + "go.uber.org/zap" +) + +// LogAgentBuilder is a construct used to build a log agent +type LogAgentBuilder struct { + cfg *Config + logger *zap.SugaredLogger + pluginDir string + databaseFile string + defaultOutput operator.Operator +} + +// NewBuilder creates a new LogAgentBuilder +func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder { + return &LogAgentBuilder{ + cfg: cfg, + logger: logger, + } +} + +// WithPluginDir adds the specified plugin directory when building a log agent +func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder { + b.pluginDir = pluginDir + return b +} + +// WithDatabaseFile adds the specified database file when building a log agent +func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder { + b.databaseFile = databaseFile + return b +} + +// WithDefaultOutput adds a default output when building a log agent +func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder { + b.defaultOutput = defaultOutput + return b +} + +// Build will build a new log agent using the values defined on the builder +func (b *LogAgentBuilder) Build() (*LogAgent, error) { + database, err := operator.OpenDatabase(b.databaseFile) + if err != nil { + return nil, errors.Wrap(err, "open database") + } + + registry, err := operator.NewPluginRegistry(b.pluginDir) + if err != nil { + return nil, errors.Wrap(err, "load plugin registry") + } + + buildContext := operator.BuildContext{ + Logger: b.logger, + PluginRegistry: registry, + Database: database, + } + + pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput) + if err != nil { + return nil, err + } + + return &LogAgent{ + pipeline: pipeline, + database: database, + SugaredLogger: b.logger, + }, nil +} diff --git a/agent/builder_test.go b/agent/builder_test.go new file mode 100644 index 000000000..352844918 --- /dev/null +++ b/agent/builder_test.go @@ -0,0 +1,85 @@ +package agent + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "github.com/observiq/stanza/pipeline" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestBuildAgentSuccess(t *testing.T) { + mockCfg := Config{} + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "/some/path/plugins" + mockDatabaseFile := "" + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.NoError(t, err) + require.Equal(t, mockLogger, agent.SugaredLogger) +} + +func TestBuildAgentFailureOnDatabase(t *testing.T) { + tempDir := testutil.NewTempDir(t) + invalidDatabaseFile := filepath.Join(tempDir, "test.db") + err := ioutil.WriteFile(invalidDatabaseFile, []byte("invalid"), 0755) + require.NoError(t, err) + + mockCfg := Config{} + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "/some/path/plugins" + mockDatabaseFile := invalidDatabaseFile + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.Error(t, err) + require.Nil(t, agent) +} + +func TestBuildAgentFailureOnPluginRegistry(t *testing.T) { + mockCfg := Config{} + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "[]" + mockDatabaseFile := "" + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.Error(t, err) + require.Nil(t, agent) +} + +func TestBuildAgentFailureOnPipeline(t *testing.T) { + mockCfg := Config{ + Pipeline: pipeline.Config{ + pipeline.Params{"type": "missing"}, + }, + } + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "/some/path/plugins" + mockDatabaseFile := "" + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.Error(t, err) + require.Nil(t, agent) +} diff --git a/agent/config_test.go b/agent/config_test.go new file mode 100644 index 000000000..77fdbf094 --- /dev/null +++ b/agent/config_test.go @@ -0,0 +1,114 @@ +package agent + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "github.com/observiq/stanza/pipeline" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" +) + +func TestNewConfigFromFile(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + - type: operator +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + config, err := NewConfigFromFile(configFile) + require.NoError(t, err) + require.Equal(t, len(config.Pipeline), 1) +} + +func TestNewConfigWithMissingFile(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + + _, err := NewConfigFromFile(configFile) + require.Error(t, err) + require.Contains(t, err.Error(), "no such file or directory") +} + +func TestNewConfigWithInvalidYAML(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + invalid: structure +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + _, err = NewConfigFromFile(configFile) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to read config file as yaml") +} + +func TestNewConfigFromGlobs(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + - type: operator +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + globs := []string{filepath.Join(tempDir, "*.yaml")} + config, err := NewConfigFromGlobs(globs) + require.NoError(t, err) + require.Equal(t, len(config.Pipeline), 1) +} + +func TestNewConfigFromGlobsWithInvalidGlob(t *testing.T) { + globs := []string{"[]"} + _, err := NewConfigFromGlobs(globs) + require.Error(t, err) + require.Contains(t, err.Error(), "syntax error in pattern") +} + +func TestNewConfigFromGlobsWithNoMatches(t *testing.T) { + tempDir := testutil.NewTempDir(t) + globs := []string{filepath.Join(tempDir, "*.yaml")} + _, err := NewConfigFromGlobs(globs) + require.Error(t, err) + require.Contains(t, err.Error(), "No config files found") +} + +func TestNewConfigFromGlobsWithInvalidConfig(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + invalid: structure +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + globs := []string{filepath.Join(tempDir, "*.yaml")} + _, err = NewConfigFromGlobs(globs) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to read config file as yaml") +} + +func TestMergeConfigs(t *testing.T) { + config1 := Config{ + Pipeline: pipeline.Config{ + {"type": "first"}, + }, + } + + config2 := Config{ + Pipeline: pipeline.Config{ + {"type": "second"}, + }, + } + + config3 := mergeConfigs(&config1, &config2) + require.Equal(t, len(config3.Pipeline), 2) +} diff --git a/cmd/stanza/graph.go b/cmd/stanza/graph.go index 3c1beb55e..fbae1cf19 100644 --- a/cmd/stanza/graph.go +++ b/cmd/stanza/graph.go @@ -59,7 +59,7 @@ func runGraph(_ *cobra.Command, _ []string, flags *RootFlags) { os.Exit(1) } - dotGraph, err := pipeline.MarshalDot() + dotGraph, err := pipeline.Render() if err != nil { logger.Errorw("Failed to marshal dot graph", zap.Any("error", err)) os.Exit(1) diff --git a/cmd/stanza/offsets.go b/cmd/stanza/offsets.go index a013d2c0c..1dc6fcec1 100644 --- a/cmd/stanza/offsets.go +++ b/cmd/stanza/offsets.go @@ -5,7 +5,7 @@ import ( "io" "os" - agent "github.com/observiq/stanza/agent" + "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/helper" "github.com/spf13/cobra" "go.etcd.io/bbolt" @@ -39,7 +39,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command { Short: "Clear persisted offsets from the database", Args: cobra.ArbitraryArgs, Run: func(command *cobra.Command, args []string) { - db, err := agent.OpenDatabase(rootFlags.DatabaseFile) + db, err := operator.OpenDatabase(rootFlags.DatabaseFile) exitOnErr("Failed to open database", err) defer db.Close() defer db.Sync() @@ -90,7 +90,7 @@ func NewOffsetsListCmd(rootFlags *RootFlags) *cobra.Command { Short: "List operators with persisted offsets", Args: cobra.NoArgs, Run: func(command *cobra.Command, args []string) { - db, err := agent.OpenDatabase(rootFlags.DatabaseFile) + db, err := operator.OpenDatabase(rootFlags.DatabaseFile) exitOnErr("Failed to open database", err) defer db.Close() diff --git a/cmd/stanza/service.go b/cmd/stanza/service.go index 51161fefa..2d4422308 100644 --- a/cmd/stanza/service.go +++ b/cmd/stanza/service.go @@ -23,14 +23,23 @@ func (a *AgentService) Start(s service.Service) error { if err := a.agent.Start(); err != nil { a.agent.Errorw("Failed to start stanza agent", zap.Any("error", err)) a.cancel() + return nil } + + a.agent.Info("Stanza agent started") return nil } // Stop will stop the stanza agent. func (a *AgentService) Stop(s service.Service) error { a.agent.Info("Stopping stanza agent") - a.agent.Stop() + if err := a.agent.Stop(); err != nil { + a.agent.Errorw("Failed to stop stanza agent gracefully", zap.Any("error", err)) + a.cancel() + return nil + } + + a.agent.Info("Stanza agent stopped") a.cancel() return nil } diff --git a/entry/entry_test.go b/entry/entry_test.go index de0f1e1a8..c7b9048ac 100644 --- a/entry/entry_test.go +++ b/entry/entry_test.go @@ -191,3 +191,44 @@ func TestFieldFromString(t *testing.T) { }) } } + +func TestAddLabel(t *testing.T) { + entry := Entry{} + entry.AddLabel("label", "value") + expected := map[string]string{"label": "value"} + require.Equal(t, expected, entry.Labels) +} + +func TestAddResourceKey(t *testing.T) { + entry := Entry{} + entry.AddResourceKey("key", "value") + expected := map[string]string{"key": "value"} + require.Equal(t, expected, entry.Resource) +} + +func TestReadToInterfaceMapWithMissingField(t *testing.T) { + entry := Entry{} + field := NewLabelField("label") + dest := map[string]interface{}{} + err := entry.readToInterfaceMap(field, &dest) + require.Error(t, err) + require.Contains(t, err.Error(), "can not be read as a map[string]interface{}") +} + +func TestReadToStringMapWithMissingField(t *testing.T) { + entry := Entry{} + field := NewLabelField("label") + dest := map[string]string{} + err := entry.readToStringMap(field, &dest) + require.Error(t, err) + require.Contains(t, err.Error(), "can not be read as a map[string]string") +} + +func TestReadToInterfaceMissingField(t *testing.T) { + entry := Entry{} + field := NewLabelField("label") + var dest interface{} + err := entry.readToInterface(field, &dest) + require.Error(t, err) + require.Contains(t, err.Error(), "can not be read as a interface{}") +} diff --git a/entry/field_test.go b/entry/field_test.go index 44acf1b55..a280cd1d9 100644 --- a/entry/field_test.go +++ b/entry/field_test.go @@ -42,6 +42,14 @@ func TestFieldUnmarshalJSON(t *testing.T) { } } +func TestFieldUnmarshalJSONFailure(t *testing.T) { + invalidField := []byte(`{"key":"value"}`) + var f Field + err := json.Unmarshal(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "cannot unmarshal object into Go value of type string") +} + func TestFieldMarshalJSON(t *testing.T) { cases := []struct { name string @@ -114,6 +122,14 @@ func TestFieldUnmarshalYAML(t *testing.T) { } } +func TestFieldUnmarshalYAMLFailure(t *testing.T) { + invalidField := []byte(`invalid: field`) + var f Field + err := yaml.UnmarshalStrict(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "cannot unmarshal !!map into string") +} + func TestFieldMarshalYAML(t *testing.T) { cases := []struct { name string @@ -207,6 +223,9 @@ func TestSplitField(t *testing.T) { {"BracketAtEnd", `$record[`, nil, true}, {"SingleQuoteAtEnd", `$record['`, nil, true}, {"DoubleQuoteAtEnd", `$record["`, nil, true}, + {"BracketMissingQuotes", `$record[test]`, nil, true}, + {"CharacterBetweenBracketAndQuote", `$record["test"a]`, nil, true}, + {"CharacterOutsideBracket", `$record["test"]a`, nil, true}, } for _, tc := range cases { @@ -222,3 +241,21 @@ func TestSplitField(t *testing.T) { }) } } + +func TestFieldFromStringInvalidSplit(t *testing.T) { + _, err := fieldFromString("$resource[test]") + require.Error(t, err) + require.Contains(t, err.Error(), "splitting field") +} + +func TestFieldFromStringWithResource(t *testing.T) { + field, err := fieldFromString(`$resource["test"]`) + require.NoError(t, err) + require.Equal(t, "$resource.test", field.String()) +} + +func TestFieldFromStringWithInvalidResource(t *testing.T) { + _, err := fieldFromString(`$resource["test"]["key"]`) + require.Error(t, err) + require.Contains(t, err.Error(), "resource fields cannot be nested") +} diff --git a/entry/nil_field_test.go b/entry/nil_field_test.go new file mode 100644 index 000000000..089f5be24 --- /dev/null +++ b/entry/nil_field_test.go @@ -0,0 +1,37 @@ +package entry + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNilFieldGet(t *testing.T) { + entry := &Entry{} + nilField := NewNilField() + value, ok := nilField.Get(entry) + require.True(t, ok) + require.Nil(t, value) +} + +func TestNilFieldSet(t *testing.T) { + entry := &Entry{} + nilField := NewNilField() + err := nilField.Set(entry, "value") + require.NoError(t, err) + require.Equal(t, *entry, Entry{}) +} + +func TestNilFieldDelete(t *testing.T) { + entry := &Entry{} + nilField := NewNilField() + value, ok := nilField.Delete(entry) + require.True(t, ok) + require.Nil(t, value) + require.Equal(t, *entry, Entry{}) +} + +func TestNilFieldString(t *testing.T) { + nilField := NewNilField() + require.Equal(t, "$nil", nilField.String()) +} diff --git a/entry/record_field.go b/entry/record_field.go index 9834e1b8b..10aaedaae 100644 --- a/entry/record_field.go +++ b/entry/record_field.go @@ -84,7 +84,7 @@ func (f RecordField) Set(entry *Entry, value interface{}) error { for i, key := range f.Keys { if i == len(f.Keys)-1 { currentMap[key] = value - return nil + break } currentMap = f.getNestedMap(currentMap, key) } @@ -122,12 +122,12 @@ func (f RecordField) Delete(entry *Entry) (interface{}, bool) { for i, key := range f.Keys { currentMap, ok := currentValue.(map[string]interface{}) if !ok { - return nil, false + break } currentValue, ok = currentMap[key] if !ok { - return nil, false + break } if i == len(f.Keys)-1 { diff --git a/entry/record_field_test.go b/entry/record_field_test.go index 45b31ef7d..071e84681 100644 --- a/entry/record_field_test.go +++ b/entry/record_field_test.go @@ -1,10 +1,12 @@ package entry import ( + "encoding/json" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + yaml "gopkg.in/yaml.v2" ) func testRecord() map[string]interface{} { @@ -159,6 +161,14 @@ func TestRecordFieldDelete(t *testing.T) { nestedMap(), true, }, + { + "InvalidNestedKey", + NewRecordField("simple_key", "missing"), + testRecord(), + testRecord(), + nil, + false, + }, } for _, tc := range cases { @@ -194,6 +204,13 @@ func TestRecordFieldSet(t *testing.T) { "new_value", "new_value", }, + { + "OverwriteRawWithMap", + NewRecordField("embedded", "field"), + "raw_value", + "new_value", + map[string]interface{}{"embedded": map[string]interface{}{"field": "new_value"}}, + }, { "NewMapValue", NewRecordField(), @@ -268,7 +285,70 @@ func TestRecordFieldParent(t *testing.T) { }) } -func TestFieldChild(t *testing.T) { +func TestRecordFieldChild(t *testing.T) { field := RecordField{[]string{"parent"}} require.Equal(t, RecordField{[]string{"parent", "child"}}, field.Child("child")) } + +func TestRecordFieldMerge(t *testing.T) { + entry := &Entry{} + entry.Record = "raw_value" + field := RecordField{[]string{"embedded"}} + values := map[string]interface{}{"new": "values"} + field.Merge(entry, values) + expected := map[string]interface{}{"embedded": values} + require.Equal(t, expected, entry.Record) +} + +func TestRecordFieldMarshalJSON(t *testing.T) { + recordField := RecordField{Keys: []string{"test"}} + json, err := recordField.MarshalJSON() + require.NoError(t, err) + require.Equal(t, []byte(`"test"`), json) +} + +func TestRecordFieldUnmarshalJSON(t *testing.T) { + fieldString := []byte(`"test"`) + var f RecordField + err := json.Unmarshal(fieldString, &f) + require.NoError(t, err) + require.Equal(t, RecordField{Keys: []string{"test"}}, f) +} + +func TestRecordFieldUnmarshalJSONFailure(t *testing.T) { + invalidField := []byte(`{"key":"value"}`) + var f RecordField + err := json.Unmarshal(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "the field is not a string: json") +} + +func TestRecordFieldMarshalYAML(t *testing.T) { + recordField := RecordField{Keys: []string{"test"}} + yaml, err := recordField.MarshalYAML() + require.NoError(t, err) + require.Equal(t, "test", yaml) +} + +func TestRecordFieldUnmarshalYAML(t *testing.T) { + invalidField := []byte("test") + var f RecordField + err := yaml.UnmarshalStrict(invalidField, &f) + require.NoError(t, err) + require.Equal(t, RecordField{Keys: []string{"test"}}, f) +} + +func TestRecordFieldUnmarshalYAMLFailure(t *testing.T) { + invalidField := []byte(`{"key":"value"}`) + var f RecordField + err := yaml.UnmarshalStrict(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "the field is not a string: yaml") +} + +func TestRecordFieldFromJSONDot(t *testing.T) { + jsonDot := "$.test" + recordField := fromJSONDot(jsonDot) + expectedField := RecordField{Keys: []string{"test"}} + require.Equal(t, expectedField, recordField) +} diff --git a/operator/config.go b/operator/config.go index 6236aa7eb..91f8b7e33 100644 --- a/operator/config.go +++ b/operator/config.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" - "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -29,36 +28,6 @@ type BuildContext struct { Logger *zap.SugaredLogger } -// Database is a database used to save offsets -type Database interface { - Close() error - Sync() error - Update(func(*bbolt.Tx) error) error - View(func(*bbolt.Tx) error) error -} - -// StubDatabase is an implementation of Database that -// succeeds on all calls without persisting anything to disk. -// This is used when --database is unspecified. -type StubDatabase struct{} - -// Close will be ignored by the stub database -func (d *StubDatabase) Close() error { return nil } - -// Sync will be ignored by the stub database -func (d *StubDatabase) Sync() error { return nil } - -// Update will be ignored by the stub database -func (d *StubDatabase) Update(func(tx *bbolt.Tx) error) error { return nil } - -// View will be ignored by the stub database -func (d *StubDatabase) View(func(tx *bbolt.Tx) error) error { return nil } - -// NewStubDatabase creates a new StubDatabase -func NewStubDatabase() *StubDatabase { - return &StubDatabase{} -} - // registry is a global registry of operator types to operator builders. var registry = make(map[string]func() Builder) diff --git a/operator/database.go b/operator/database.go new file mode 100644 index 000000000..b0765409c --- /dev/null +++ b/operator/database.go @@ -0,0 +1,63 @@ +//go:generate mockery -name=^(Database)$ -output=../testutil -outpkg=testutil -case=snake + +package operator + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "go.etcd.io/bbolt" +) + +// Database is a database used to save offsets +type Database interface { + Close() error + Sync() error + Update(func(*bbolt.Tx) error) error + View(func(*bbolt.Tx) error) error +} + +// StubDatabase is an implementation of Database that +// succeeds on all calls without persisting anything to disk. +// This is used when --database is unspecified. +type StubDatabase struct{} + +// Close will be ignored by the stub database +func (d *StubDatabase) Close() error { return nil } + +// Sync will be ignored by the stub database +func (d *StubDatabase) Sync() error { return nil } + +// Update will be ignored by the stub database +func (d *StubDatabase) Update(func(tx *bbolt.Tx) error) error { return nil } + +// View will be ignored by the stub database +func (d *StubDatabase) View(func(tx *bbolt.Tx) error) error { return nil } + +// NewStubDatabase creates a new StubDatabase +func NewStubDatabase() *StubDatabase { + return &StubDatabase{} +} + +// OpenDatabase will open and create a database +func OpenDatabase(file string) (Database, error) { + if file == "" { + return NewStubDatabase(), nil + } + + if _, err := os.Stat(filepath.Dir(file)); err != nil { + if os.IsNotExist(err) { + err := os.MkdirAll(filepath.Dir(file), 0755) + if err != nil { + return nil, fmt.Errorf("creating database directory: %s", err) + } + } else { + return nil, err + } + } + + options := &bbolt.Options{Timeout: 1 * time.Second} + return bbolt.Open(file, 0666, options) +} diff --git a/operator/database_test.go b/operator/database_test.go new file mode 100644 index 000000000..2582d665a --- /dev/null +++ b/operator/database_test.go @@ -0,0 +1,38 @@ +package operator + +import ( + "os" + "path/filepath" + "runtime" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestOpenDatabase(t *testing.T) { + t.Run("Simple", func(t *testing.T) { + tempDir := NewTempDir(t) + db, err := OpenDatabase(filepath.Join(tempDir, "test.db")) + require.NoError(t, err) + require.NotNil(t, db) + }) + + t.Run("NonexistantPathIsCreated", func(t *testing.T) { + tempDir := NewTempDir(t) + db, err := OpenDatabase(filepath.Join(tempDir, "nonexistdir", "test.db")) + require.NoError(t, err) + require.NotNil(t, db) + }) + + t.Run("BadPermissions", func(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Windows does not have the same kind of file permissions") + } + tempDir := NewTempDir(t) + err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0666) + require.NoError(t, err) + db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) + require.Error(t, err) + require.Nil(t, db) + }) +} diff --git a/pipeline/config.go b/pipeline/config.go index f8b5faa49..b2e7c843e 100644 --- a/pipeline/config.go +++ b/pipeline/config.go @@ -14,7 +14,7 @@ import ( type Config []Params // BuildPipeline will build a pipeline from the config. -func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput operator.Operator) (*Pipeline, error) { +func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput operator.Operator) (*DirectedPipeline, error) { operatorConfigs, err := c.buildOperatorConfigs(context.PluginRegistry) if err != nil { return nil, err @@ -29,7 +29,7 @@ func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput opera operators = append(operators, defaultOutput) } - pipeline, err := NewPipeline(operators) + pipeline, err := NewDirectedPipeline(operators) if err != nil { return nil, err } diff --git a/pipeline/directed.go b/pipeline/directed.go new file mode 100644 index 000000000..c785da2a4 --- /dev/null +++ b/pipeline/directed.go @@ -0,0 +1,190 @@ +package pipeline + +import ( + "fmt" + "strings" + + "github.com/observiq/stanza/errors" + "github.com/observiq/stanza/operator" + "gonum.org/v1/gonum/graph/encoding/dot" + "gonum.org/v1/gonum/graph/simple" + "gonum.org/v1/gonum/graph/topo" +) + +// DirectedPipeline is a pipeline backed by a directed graph +type DirectedPipeline struct { + Graph *simple.DirectedGraph + running bool +} + +// Start will start the operators in a pipeline in reverse topological order +func (p *DirectedPipeline) Start() error { + if p.running { + return nil + } + + sortedNodes, _ := topo.Sort(p.Graph) + for i := len(sortedNodes) - 1; i >= 0; i-- { + operator := sortedNodes[i].(OperatorNode).Operator() + operator.Logger().Debug("Starting operator") + if err := operator.Start(); err != nil { + return err + } + operator.Logger().Debug("Started operator") + } + + p.running = true + return nil +} + +// Stop will stop the operators in a pipeline in topological order +func (p *DirectedPipeline) Stop() error { + if !p.running { + return nil + } + + sortedNodes, _ := topo.Sort(p.Graph) + for _, node := range sortedNodes { + operator := node.(OperatorNode).Operator() + operator.Logger().Debug("Stopping operator") + _ = operator.Stop() + operator.Logger().Debug("Stopped operator") + } + + p.running = false + return nil +} + +// Running will return the running state of the pipeline +func (p *DirectedPipeline) Running() bool { + return p.running +} + +// Render will render the pipeline as a dot graph +func (p *DirectedPipeline) Render() ([]byte, error) { + return dot.Marshal(p.Graph, "G", "", " ") +} + +// addNodes will add operators as nodes to the supplied graph. +func addNodes(graph *simple.DirectedGraph, operators []operator.Operator) error { + for _, operator := range operators { + operatorNode := createOperatorNode(operator) + if graph.Node(operatorNode.ID()) != nil { + return errors.NewError( + fmt.Sprintf("operator with id '%s' already exists in pipeline", operatorNode.Operator().ID()), + "ensure that each operator has a unique `type` or `id`", + ) + } + + graph.AddNode(operatorNode) + } + return nil +} + +// connectNodes will connect the nodes in the supplied graph. +func connectNodes(graph *simple.DirectedGraph) error { + nodes := graph.Nodes() + for nodes.Next() { + node := nodes.Node().(OperatorNode) + if err := connectNode(graph, node); err != nil { + return err + } + } + + if _, err := topo.Sort(graph); err != nil { + return errors.NewError( + "pipeline has a circular dependency", + "ensure that all operators are connected in a straight, acyclic line", + "cycles", unorderableToCycles(err.(topo.Unorderable)), + ) + } + + return nil +} + +// connectNode will connect a node to its outputs in the supplied graph. +func connectNode(graph *simple.DirectedGraph, inputNode OperatorNode) error { + for outputOperatorID, outputNodeID := range inputNode.OutputIDs() { + if graph.Node(outputNodeID) == nil { + return errors.NewError( + "operators cannot be connected, because the output does not exist in the pipeline", + "ensure that the output operator is defined", + "input_operator", inputNode.Operator().ID(), + "output_operator", outputOperatorID, + ) + } + + outputNode := graph.Node(outputNodeID).(OperatorNode) + if !outputNode.Operator().CanProcess() { + return errors.NewError( + "operators cannot be connected, because the output operator can not process logs", + "ensure that the output operator can process logs (like a parser or destination)", + "input_operator", inputNode.Operator().ID(), + "output_operator", outputOperatorID, + ) + } + + if graph.HasEdgeFromTo(inputNode.ID(), outputNodeID) { + return errors.NewError( + "operators cannot be connected, because a connection already exists", + "ensure that only a single connection exists between the two operators", + "input_operator", inputNode.Operator().ID(), + "output_operator", outputOperatorID, + ) + } + + edge := graph.NewEdge(inputNode, outputNode) + graph.SetEdge(edge) + } + + return nil +} + +// setOperatorOutputs will set the outputs on operators that can output. +func setOperatorOutputs(operators []operator.Operator) error { + for _, operator := range operators { + if !operator.CanOutput() { + continue + } + + if err := operator.SetOutputs(operators); err != nil { + return errors.WithDetails(err, "operator_id", operator.ID()) + } + } + return nil +} + +// NewDirectedPipeline creates a new directed pipeline +func NewDirectedPipeline(operators []operator.Operator) (*DirectedPipeline, error) { + if err := setOperatorOutputs(operators); err != nil { + return nil, err + } + + graph := simple.NewDirectedGraph() + if err := addNodes(graph, operators); err != nil { + return nil, err + } + + if err := connectNodes(graph); err != nil { + return nil, err + } + + return &DirectedPipeline{Graph: graph}, nil +} + +func unorderableToCycles(err topo.Unorderable) string { + var cycles strings.Builder + for i, cycle := range err { + if i != 0 { + cycles.WriteByte(',') + } + cycles.WriteByte('(') + for _, node := range cycle { + cycles.WriteString(node.(OperatorNode).operator.ID()) + cycles.Write([]byte(` -> `)) + } + cycles.WriteString(cycle[0].(OperatorNode).operator.ID()) + cycles.WriteByte(')') + } + return cycles.String() +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index c09fe6f21..0e01a89e0 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -1,184 +1,11 @@ -package pipeline - -import ( - "fmt" - "strings" - - "github.com/observiq/stanza/errors" - "github.com/observiq/stanza/operator" - "gonum.org/v1/gonum/graph/encoding/dot" - "gonum.org/v1/gonum/graph/simple" - "gonum.org/v1/gonum/graph/topo" -) - -// Pipeline is a directed graph of connected operators. -type Pipeline struct { - Graph *simple.DirectedGraph - running bool -} - -// Start will start the operators in a pipeline in reverse topological order. -func (p *Pipeline) Start() error { - if p.running { - return nil - } - - sortedNodes, _ := topo.Sort(p.Graph) - for i := len(sortedNodes) - 1; i >= 0; i-- { - operator := sortedNodes[i].(OperatorNode).Operator() - operator.Logger().Debug("Starting operator") - if err := operator.Start(); err != nil { - return err - } - operator.Logger().Debug("Started operator") - } - - p.running = true - return nil -} - -// Stop will stop the operators in a pipeline in topological order. -func (p *Pipeline) Stop() { - if !p.running { - return - } - - sortedNodes, _ := topo.Sort(p.Graph) - for _, node := range sortedNodes { - operator := node.(OperatorNode).Operator() - operator.Logger().Debug("Stopping operator") - _ = operator.Stop() - operator.Logger().Debug("Stopped operator") - } - - p.running = false -} - -// MarshalDot will encode the pipeline as a dot graph. -func (p *Pipeline) MarshalDot() ([]byte, error) { - return dot.Marshal(p.Graph, "G", "", " ") -} - -// addNodes will add operators as nodes to the supplied graph. -func addNodes(graph *simple.DirectedGraph, operators []operator.Operator) error { - for _, operator := range operators { - operatorNode := createOperatorNode(operator) - if graph.Node(operatorNode.ID()) != nil { - return errors.NewError( - fmt.Sprintf("operator with id '%s' already exists in pipeline", operatorNode.Operator().ID()), - "ensure that each operator has a unique `type` or `id`", - ) - } - - graph.AddNode(operatorNode) - } - return nil -} +//go:generate mockery -name=^(Pipeline)$ -output=../testutil -outpkg=testutil -case=snake -// connectNodes will connect the nodes in the supplied graph. -func connectNodes(graph *simple.DirectedGraph) error { - nodes := graph.Nodes() - for nodes.Next() { - node := nodes.Node().(OperatorNode) - if err := connectNode(graph, node); err != nil { - return err - } - } - - if _, err := topo.Sort(graph); err != nil { - return errors.NewError( - "pipeline has a circular dependency", - "ensure that all operators are connected in a straight, acyclic line", - "cycles", unorderableToCycles(err.(topo.Unorderable)), - ) - } - - return nil -} - -// connectNode will connect a node to its outputs in the supplied graph. -func connectNode(graph *simple.DirectedGraph, inputNode OperatorNode) error { - for outputOperatorID, outputNodeID := range inputNode.OutputIDs() { - if graph.Node(outputNodeID) == nil { - return errors.NewError( - "operators cannot be connected, because the output does not exist in the pipeline", - "ensure that the output operator is defined", - "input_operator", inputNode.Operator().ID(), - "output_operator", outputOperatorID, - ) - } - - outputNode := graph.Node(outputNodeID).(OperatorNode) - if !outputNode.Operator().CanProcess() { - return errors.NewError( - "operators cannot be connected, because the output operator can not process logs", - "ensure that the output operator can process logs (like a parser or destination)", - "input_operator", inputNode.Operator().ID(), - "output_operator", outputOperatorID, - ) - } - - if graph.HasEdgeFromTo(inputNode.ID(), outputNodeID) { - return errors.NewError( - "operators cannot be connected, because a connection already exists", - "ensure that only a single connection exists between the two operators", - "input_operator", inputNode.Operator().ID(), - "output_operator", outputOperatorID, - ) - } - - edge := graph.NewEdge(inputNode, outputNode) - graph.SetEdge(edge) - } - - return nil -} - -// setOperatorOutputs will set the outputs on operators that can output. -func setOperatorOutputs(operators []operator.Operator) error { - for _, operator := range operators { - if !operator.CanOutput() { - continue - } - - if err := operator.SetOutputs(operators); err != nil { - return errors.WithDetails(err, "operator_id", operator.ID()) - } - } - return nil -} - -// NewPipeline creates a new pipeline of connected operators. -func NewPipeline(operators []operator.Operator) (*Pipeline, error) { - if err := setOperatorOutputs(operators); err != nil { - return nil, err - } - - graph := simple.NewDirectedGraph() - if err := addNodes(graph, operators); err != nil { - return nil, err - } - - if err := connectNodes(graph); err != nil { - return nil, err - } - - return &Pipeline{Graph: graph}, nil -} +package pipeline -func unorderableToCycles(err topo.Unorderable) string { - var cycles strings.Builder - for i, cycle := range err { - if i != 0 { - cycles.WriteByte(',') - } - cycles.WriteByte('(') - for _, node := range cycle { - cycles.WriteString(node.(OperatorNode).operator.ID()) - cycles.Write([]byte(` -> `)) - } - cycles.WriteString(cycle[0].(OperatorNode).operator.ID()) - cycles.WriteByte(')') - } - return cycles.String() +// Pipeline is a collection of connected operators that exchange entries +type Pipeline interface { + Start() error + Stop() error + Render() ([]byte, error) + Running() bool } diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 2cb68a116..7629034c7 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -67,7 +67,7 @@ func TestUnorderableToCycles(t *testing.T) { func TestPipeline(t *testing.T) { t.Run("MultipleStart", func(t *testing.T) { - pipeline, err := NewPipeline([]operator.Operator{}) + pipeline, err := NewDirectedPipeline([]operator.Operator{}) require.NoError(t, err) err = pipeline.Start() @@ -80,7 +80,7 @@ func TestPipeline(t *testing.T) { }) t.Run("MultipleStop", func(t *testing.T) { - pipeline, err := NewPipeline([]operator.Operator{}) + pipeline, err := NewDirectedPipeline([]operator.Operator{}) require.NoError(t, err) err = pipeline.Start() @@ -98,7 +98,7 @@ func TestPipeline(t *testing.T) { operator2.On("SetOutputs", mock.Anything).Return(nil) operator2.On("Outputs").Return(nil) - _, err := NewPipeline([]operator.Operator{operator1, operator2}) + _, err := NewDirectedPipeline([]operator.Operator{operator1, operator2}) require.Error(t, err) require.Contains(t, err.Error(), "already exists") }) @@ -112,7 +112,7 @@ func TestPipeline(t *testing.T) { operator2.On("SetOutputs", mock.Anything).Return(nil) operator2.On("Outputs").Return([]operator.Operator{operator1}) - _, err := NewPipeline([]operator.Operator{operator2}) + _, err := NewDirectedPipeline([]operator.Operator{operator2}) require.Error(t, err) require.Contains(t, err.Error(), "does not exist") }) @@ -129,7 +129,7 @@ func TestPipeline(t *testing.T) { operator2.On("SetOutputs", mock.Anything).Return(nil) operator2.On("Outputs").Return([]operator.Operator{operator1}) - _, err := NewPipeline([]operator.Operator{operator1, operator2}) + _, err := NewDirectedPipeline([]operator.Operator{operator1, operator2}) require.Error(t, err) require.Contains(t, err.Error(), "can not process") }) @@ -168,7 +168,7 @@ func TestPipeline(t *testing.T) { mockOperator3.On("Outputs").Return([]operator.Operator{mockOperator1}) mockOperator3.On("SetOutputs", mock.Anything).Return(nil) - _, err := NewPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) + _, err := NewDirectedPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) require.Error(t, err) require.Contains(t, err.Error(), "circular dependency") }) diff --git a/testutil/database.go b/testutil/database.go new file mode 100644 index 000000000..f2ab136a0 --- /dev/null +++ b/testutil/database.go @@ -0,0 +1,69 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package testutil + +import ( + mock "github.com/stretchr/testify/mock" + bbolt "go.etcd.io/bbolt" +) + +// Database is an autogenerated mock type for the Database type +type Database struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *Database) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Sync provides a mock function with given fields: +func (_m *Database) Sync() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Update provides a mock function with given fields: _a0 +func (_m *Database) Update(_a0 func(*bbolt.Tx) error) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(func(*bbolt.Tx) error) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// View provides a mock function with given fields: _a0 +func (_m *Database) View(_a0 func(*bbolt.Tx) error) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(func(*bbolt.Tx) error) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/testutil/pipeline.go b/testutil/pipeline.go new file mode 100644 index 000000000..2152bd92f --- /dev/null +++ b/testutil/pipeline.go @@ -0,0 +1,75 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package testutil + +import mock "github.com/stretchr/testify/mock" + +// Pipeline is an autogenerated mock type for the Pipeline type +type Pipeline struct { + mock.Mock +} + +// Render provides a mock function with given fields: +func (_m *Pipeline) Render() ([]byte, error) { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Running provides a mock function with given fields: +func (_m *Pipeline) Running() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Start provides a mock function with given fields: +func (_m *Pipeline) Start() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Stop provides a mock function with given fields: +func (_m *Pipeline) Stop() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} From 2fb4b200b72e43b91c6cfe1b94bea398e3297320 Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Mon, 7 Sep 2020 16:38:23 -0400 Subject: [PATCH 02/13] Made a database package and improved tests --- agent/agent.go | 4 +- agent/builder.go | 7 ++- {operator => database}/database.go | 2 +- database/database_test.go | 89 ++++++++++++++++++++++++++++++ errors/error.go | 5 +- internal/version/version_test.go | 29 ++++++++++ operator/config.go | 3 +- operator/config_test.go | 3 +- operator/database_test.go | 38 ------------- operator/helper/persister.go | 6 +- testutil/database.go | 3 +- 11 files changed, 135 insertions(+), 54 deletions(-) rename {operator => database}/database.go (98%) create mode 100644 database/database_test.go create mode 100644 internal/version/version_test.go delete mode 100644 operator/database_test.go diff --git a/agent/agent.go b/agent/agent.go index 6ca296814..f3d6e253d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -3,14 +3,14 @@ package agent import ( "sync" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/database" "github.com/observiq/stanza/pipeline" "go.uber.org/zap" ) // LogAgent is an entity that handles log monitoring. type LogAgent struct { - database operator.Database + database database.Database pipeline pipeline.Pipeline startOnce sync.Once diff --git a/agent/builder.go b/agent/builder.go index 8938f9dac..c261580d2 100644 --- a/agent/builder.go +++ b/agent/builder.go @@ -1,6 +1,7 @@ package agent import ( + "github.com/observiq/stanza/database" "github.com/observiq/stanza/errors" "github.com/observiq/stanza/operator" "go.uber.org/zap" @@ -43,7 +44,7 @@ func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *Lo // Build will build a new log agent using the values defined on the builder func (b *LogAgentBuilder) Build() (*LogAgent, error) { - database, err := operator.OpenDatabase(b.databaseFile) + db, err := database.OpenDatabase(b.databaseFile) if err != nil { return nil, errors.Wrap(err, "open database") } @@ -56,7 +57,7 @@ func (b *LogAgentBuilder) Build() (*LogAgent, error) { buildContext := operator.BuildContext{ Logger: b.logger, PluginRegistry: registry, - Database: database, + Database: db, } pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput) @@ -66,7 +67,7 @@ func (b *LogAgentBuilder) Build() (*LogAgent, error) { return &LogAgent{ pipeline: pipeline, - database: database, + database: db, SugaredLogger: b.logger, }, nil } diff --git a/operator/database.go b/database/database.go similarity index 98% rename from operator/database.go rename to database/database.go index b0765409c..0d0ab97d2 100644 --- a/operator/database.go +++ b/database/database.go @@ -1,6 +1,6 @@ //go:generate mockery -name=^(Database)$ -output=../testutil -outpkg=testutil -case=snake -package operator +package database import ( "fmt" diff --git a/database/database_test.go b/database/database_test.go new file mode 100644 index 000000000..c7c524cd1 --- /dev/null +++ b/database/database_test.go @@ -0,0 +1,89 @@ +package database + +import ( + "io/ioutil" + "os" + "path/filepath" + "runtime" + "testing" + + "github.com/stretchr/testify/require" +) + +// NewTempDir will return a new temp directory for testing +func NewTempDir(t testing.TB) string { + tempDir, err := ioutil.TempDir("", "") + if err != nil { + t.Errorf(err.Error()) + t.FailNow() + } + + t.Cleanup(func() { + os.RemoveAll(tempDir) + }) + + return tempDir +} + +func TestOpenDatabase(t *testing.T) { + t.Run("Simple", func(t *testing.T) { + tempDir := NewTempDir(t) + db, err := OpenDatabase(filepath.Join(tempDir, "test.db")) + require.NoError(t, err) + require.NotNil(t, db) + }) + + t.Run("NoFile", func(t *testing.T) { + db, err := OpenDatabase("") + require.NoError(t, err) + require.NotNil(t, db) + require.IsType(t, &StubDatabase{}, db) + }) + + t.Run("NonexistantPathIsCreated", func(t *testing.T) { + tempDir := NewTempDir(t) + db, err := OpenDatabase(filepath.Join(tempDir, "nonexistdir", "test.db")) + require.NoError(t, err) + require.NotNil(t, db) + }) + + t.Run("BadPermissions", func(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Windows does not have the same kind of file permissions") + } + tempDir := NewTempDir(t) + err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0666) + require.NoError(t, err) + db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) + require.Error(t, err) + require.Nil(t, db) + }) + + t.Run("ExecuteOnlyPermissions", func(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Windows does not have the same kind of file permissions") + } + tempDir := NewTempDir(t) + err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0111) + require.NoError(t, err) + db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) + require.Error(t, err) + require.Nil(t, db) + }) + +} + +func TestStubDatabase(t *testing.T) { + stubDatabase := NewStubDatabase() + err := stubDatabase.Close() + require.NoError(t, err) + + err = stubDatabase.Sync() + require.NoError(t, err) + + err = stubDatabase.Update(nil) + require.NoError(t, err) + + err = stubDatabase.View(nil) + require.NoError(t, err) +} diff --git a/errors/error.go b/errors/error.go index 8af2fb16e..5533846de 100644 --- a/errors/error.go +++ b/errors/error.go @@ -32,10 +32,7 @@ func (e AgentError) MarshalLogObject(encoder zapcore.ObjectEncoder) error { } if len(e.Details) != 0 { - err := encoder.AddObject("details", e.Details) - if err != nil { - return err - } + _ = encoder.AddObject("details", e.Details) } return nil diff --git a/internal/version/version_test.go b/internal/version/version_test.go new file mode 100644 index 000000000..ff0c1f54d --- /dev/null +++ b/internal/version/version_test.go @@ -0,0 +1,29 @@ +package version + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func resetVersion() { + Version = "" + GitHash = "" +} + +func TestGetVersionWithVersion(t *testing.T) { + Version = "0.1.1" + defer resetVersion() + require.Equal(t, Version, GetVersion()) +} + +func TestGetVersionWithGitHash(t *testing.T) { + GitHash = "git hash" + defer resetVersion() + require.Equal(t, GitHash, GetVersion()) +} + +func TestGetVersionWithUnknownVersion(t *testing.T) { + defer resetVersion() + require.Equal(t, "unknown", GetVersion()) +} diff --git a/operator/config.go b/operator/config.go index 91f8b7e33..42f04a55f 100644 --- a/operator/config.go +++ b/operator/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" + "github.com/observiq/stanza/database" "go.uber.org/zap" ) @@ -23,7 +24,7 @@ type Builder interface { // BuildContext supplies contextual resources when building an operator. type BuildContext struct { PluginRegistry PluginRegistry - Database Database + Database database.Database Parameters map[string]interface{} Logger *zap.SugaredLogger } diff --git a/operator/config_test.go b/operator/config_test.go index d8930b3d0..a3f5c0922 100644 --- a/operator/config_test.go +++ b/operator/config_test.go @@ -4,12 +4,13 @@ import ( "encoding/json" "testing" + "github.com/observiq/stanza/database" "github.com/stretchr/testify/require" yaml "gopkg.in/yaml.v2" ) func TestStubDatabase(t *testing.T) { - stub := &StubDatabase{} + stub := &database.StubDatabase{} err := stub.Close() require.NoError(t, err) diff --git a/operator/database_test.go b/operator/database_test.go deleted file mode 100644 index 2582d665a..000000000 --- a/operator/database_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package operator - -import ( - "os" - "path/filepath" - "runtime" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestOpenDatabase(t *testing.T) { - t.Run("Simple", func(t *testing.T) { - tempDir := NewTempDir(t) - db, err := OpenDatabase(filepath.Join(tempDir, "test.db")) - require.NoError(t, err) - require.NotNil(t, db) - }) - - t.Run("NonexistantPathIsCreated", func(t *testing.T) { - tempDir := NewTempDir(t) - db, err := OpenDatabase(filepath.Join(tempDir, "nonexistdir", "test.db")) - require.NoError(t, err) - require.NotNil(t, db) - }) - - t.Run("BadPermissions", func(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Windows does not have the same kind of file permissions") - } - tempDir := NewTempDir(t) - err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0666) - require.NoError(t, err) - db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) - require.Error(t, err) - require.Nil(t, db) - }) -} diff --git a/operator/helper/persister.go b/operator/helper/persister.go index ec7a096e7..89018460a 100644 --- a/operator/helper/persister.go +++ b/operator/helper/persister.go @@ -3,7 +3,7 @@ package helper import ( "sync" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/database" "go.etcd.io/bbolt" ) @@ -18,13 +18,13 @@ type Persister interface { // ScopedBBoltPersister is a persister that uses a database for the backend type ScopedBBoltPersister struct { scope []byte - db operator.Database + db database.Database cache map[string][]byte cacheMux sync.Mutex } // NewScopedDBPersister returns a new ScopedBBoltPersister -func NewScopedDBPersister(db operator.Database, scope string) *ScopedBBoltPersister { +func NewScopedDBPersister(db database.Database, scope string) *ScopedBBoltPersister { return &ScopedBBoltPersister{ scope: []byte(scope), db: db, diff --git a/testutil/database.go b/testutil/database.go index f2ab136a0..cd3f21026 100644 --- a/testutil/database.go +++ b/testutil/database.go @@ -3,8 +3,9 @@ package testutil import ( - mock "github.com/stretchr/testify/mock" bbolt "go.etcd.io/bbolt" + + mock "github.com/stretchr/testify/mock" ) // Database is an autogenerated mock type for the Database type From f8ec85c9dbfba91f7b1fc216ab620d0ba2f8c53a Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Mon, 7 Sep 2020 20:04:31 -0400 Subject: [PATCH 03/13] Improved pipeline test coverage --- database/database.go | 2 +- operator/config.go | 2 + operator/operator.go | 2 +- pipeline/config_test.go | 66 +++++++++++ .../{pipeline_test.go => directed_test.go} | 107 ++++++++++++++++++ pipeline/node_test.go | 22 ++++ pipeline/pipeline.go | 2 +- testutil/database.go | 2 +- testutil/operator.go | 2 +- testutil/operator_builder.go | 76 +++++++++++++ testutil/pipeline.go | 2 +- 11 files changed, 279 insertions(+), 6 deletions(-) rename pipeline/{pipeline_test.go => directed_test.go} (60%) create mode 100644 pipeline/node_test.go create mode 100644 testutil/operator_builder.go diff --git a/database/database.go b/database/database.go index 0d0ab97d2..53304523a 100644 --- a/database/database.go +++ b/database/database.go @@ -1,4 +1,4 @@ -//go:generate mockery -name=^(Database)$ -output=../testutil -outpkg=testutil -case=snake +//go:generate mockery --name=^(Database)$ --output=../testutil --outpkg=testutil --case=snake package database diff --git a/operator/config.go b/operator/config.go index 42f04a55f..74eb566dd 100644 --- a/operator/config.go +++ b/operator/config.go @@ -1,3 +1,5 @@ +//go:generate mockery --name=Builder --output=../testutil --outpkg=testutil --filename=operator_builder.go --structname=OperatorBuilder + package operator import ( diff --git a/operator/operator.go b/operator/operator.go index 11cf2e906..993a9b1bf 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -1,4 +1,4 @@ -//go:generate mockery -name=^(Operator)$ -output=../testutil -outpkg=testutil -case=snake +//go:generate mockery --name=^(Operator)$ --output=../testutil --outpkg=testutil --case=snake package operator diff --git a/pipeline/config_test.go b/pipeline/config_test.go index 0bd0d6ccc..1f93a591c 100644 --- a/pipeline/config_test.go +++ b/pipeline/config_test.go @@ -5,10 +5,12 @@ import ( "fmt" "testing" + "github.com/observiq/stanza/operator" _ "github.com/observiq/stanza/operator/builtin/input/generate" "github.com/observiq/stanza/operator/builtin/output/drop" _ "github.com/observiq/stanza/operator/builtin/transformer/noop" "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" yaml "gopkg.in/yaml.v2" ) @@ -502,3 +504,67 @@ func TestMultiRoundtripParams(t *testing.T) { require.Equal(t, marshalledYaml, marshalledYaml2) } } + +func TestBuildPipelineWithFailingOperator(t *testing.T) { + ctx := testutil.NewBuildContext(t) + + type invalidOperatorConfig struct { + OperatorType string `json:"type" yaml:"type"` + testutil.OperatorBuilder + } + + newBuilder := func() operator.Builder { + config := &invalidOperatorConfig{} + config.On("Build", mock.Anything).Return(nil, fmt.Errorf("failed to build operator")) + config.On("SetNamespace", mock.Anything, mock.Anything).Return() + config.On("ID").Return("test_id") + config.On("Type").Return("invalid_operator") + return config + } + + operator.Register("invalid_operator", newBuilder) + config := Config{ + {"type": "invalid_operator"}, + } + _, err := config.BuildPipeline(ctx, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to build operator") +} + +func TestBuildPipelineWithInvalidParam(t *testing.T) { + ctx := testutil.NewBuildContext(t) + config := Config{ + {"missing": "type"}, + } + _, err := config.BuildPipeline(ctx, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "missing required `type` field") +} + +type invalidYaml struct{} + +func (y invalidYaml) MarshalYAML() (interface{}, error) { + return nil, fmt.Errorf("invalid yaml") +} + +func TestBuildAsBuiltinWithInvalidParam(t *testing.T) { + params := Params{ + "field": invalidYaml{}, + } + _, err := params.buildAsBuiltin("test_namespace") + require.Error(t, err) + require.Contains(t, err.Error(), "failed to parse config map as yaml") +} + +func TestUnmarshalParamsWithInvalidBytes(t *testing.T) { + bytes := []byte("string") + var params Params + err := yaml.Unmarshal(bytes, ¶ms) + require.Error(t, err) + require.Contains(t, err.Error(), "unmarshal errors") +} + +func TestCleanValueWithUnknownType(t *testing.T) { + value := cleanValue(map[int]int{}) + require.Equal(t, "map[]", value) +} diff --git a/pipeline/pipeline_test.go b/pipeline/directed_test.go similarity index 60% rename from pipeline/pipeline_test.go rename to pipeline/directed_test.go index 7629034c7..92753663f 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/directed_test.go @@ -1,12 +1,14 @@ package pipeline import ( + "fmt" "testing" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/zap" "gonum.org/v1/gonum/graph" "gonum.org/v1/gonum/graph/simple" "gonum.org/v1/gonum/graph/topo" @@ -173,3 +175,108 @@ func TestPipeline(t *testing.T) { require.Contains(t, err.Error(), "circular dependency") }) } + +func TestPipelineStartOrder(t *testing.T) { + var mock2Started bool + var mock3Started bool + + mockOperator1 := testutil.NewMockOperator("operator1") + mockOperator2 := testutil.NewMockOperator("operator2") + mockOperator3 := testutil.NewMockOperator("operator3") + + mockOperator1.On("Outputs").Return([]operator.Operator{mockOperator2}) + mockOperator2.On("Outputs").Return([]operator.Operator{mockOperator3}) + mockOperator3.On("Outputs").Return(nil) + + mockOperator1.On("SetOutputs", mock.Anything).Return(nil) + mockOperator2.On("SetOutputs", mock.Anything).Return(nil) + mockOperator3.On("SetOutputs", mock.Anything).Return(nil) + + mockOperator1.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + mockOperator2.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + mockOperator3.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + + mockOperator1.On("Start").Return(fmt.Errorf("operator 1 failed to start")) + mockOperator2.On("Start").Run(func(mock.Arguments) { mock2Started = true }).Return(nil) + mockOperator3.On("Start").Run(func(mock.Arguments) { mock3Started = true }).Return(nil) + + pipeline, err := NewDirectedPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) + require.NoError(t, err) + + err = pipeline.Start() + require.Error(t, err) + require.Contains(t, err.Error(), "operator 1 failed to start") + require.True(t, mock2Started) + require.True(t, mock3Started) +} + +func TestPipelineStopOrder(t *testing.T) { + stopOrder := []int{} + + mockOperator1 := testutil.NewMockOperator("operator1") + mockOperator2 := testutil.NewMockOperator("operator2") + mockOperator3 := testutil.NewMockOperator("operator3") + + mockOperator1.On("Outputs").Return([]operator.Operator{mockOperator2}) + mockOperator2.On("Outputs").Return([]operator.Operator{mockOperator3}) + mockOperator3.On("Outputs").Return(nil) + + mockOperator1.On("SetOutputs", mock.Anything).Return(nil) + mockOperator2.On("SetOutputs", mock.Anything).Return(nil) + mockOperator3.On("SetOutputs", mock.Anything).Return(nil) + + mockOperator1.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + mockOperator2.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + mockOperator3.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + + mockOperator1.On("Start").Return(nil) + mockOperator2.On("Start").Return(nil) + mockOperator3.On("Start").Return(nil) + + mockOperator1.On("Stop").Run(func(mock.Arguments) { stopOrder = append(stopOrder, 1) }).Return(nil) + mockOperator2.On("Stop").Run(func(mock.Arguments) { stopOrder = append(stopOrder, 2) }).Return(nil) + mockOperator3.On("Stop").Run(func(mock.Arguments) { stopOrder = append(stopOrder, 3) }).Return(nil) + + pipeline, err := NewDirectedPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) + require.NoError(t, err) + + err = pipeline.Start() + require.NoError(t, err) + require.True(t, pipeline.Running()) + + err = pipeline.Stop() + require.NoError(t, err) + require.False(t, pipeline.Running()) + require.Equal(t, []int{1, 2, 3}, stopOrder) +} + +func TestPipelineRender(t *testing.T) { + mockOperator1 := testutil.NewMockOperator("operator1") + mockOperator2 := testutil.NewMockOperator("operator2") + mockOperator3 := testutil.NewMockOperator("operator3") + + mockOperator1.On("Outputs").Return([]operator.Operator{mockOperator2}) + mockOperator2.On("Outputs").Return([]operator.Operator{mockOperator3}) + mockOperator3.On("Outputs").Return(nil) + + mockOperator1.On("SetOutputs", mock.Anything).Return(nil) + mockOperator2.On("SetOutputs", mock.Anything).Return(nil) + mockOperator3.On("SetOutputs", mock.Anything).Return(nil) + + pipeline, err := NewDirectedPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) + require.NoError(t, err) + + dotGraph, err := pipeline.Render() + require.NoError(t, err) + expected := `strict digraph G { + // Node definitions. + operator1; + operator3; + operator2; + + // Edge definitions. + operator1 -> operator2; + operator2 -> operator3; +}` + require.Equal(t, expected, string(dotGraph)) +} diff --git a/pipeline/node_test.go b/pipeline/node_test.go new file mode 100644 index 000000000..9002bae08 --- /dev/null +++ b/pipeline/node_test.go @@ -0,0 +1,22 @@ +package pipeline + +import ( + "testing" + + _ "github.com/observiq/stanza/operator/builtin/input/generate" + _ "github.com/observiq/stanza/operator/builtin/transformer/noop" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" +) + +func TestNodeDOTID(t *testing.T) { + operator := testutil.NewMockOperator("test") + operator.On("Outputs").Return(nil) + node := createOperatorNode(operator) + require.Equal(t, operator.ID(), node.DOTID()) +} + +func TestCreateNodeID(t *testing.T) { + nodeID := createNodeID("test_id") + require.Equal(t, int64(5795108767401590291), nodeID) +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 0e01a89e0..1a551edfb 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -1,4 +1,4 @@ -//go:generate mockery -name=^(Pipeline)$ -output=../testutil -outpkg=testutil -case=snake +//go:generate mockery --name=^(Pipeline)$ --output=../testutil --outpkg=testutil --case=snake package pipeline diff --git a/testutil/database.go b/testutil/database.go index cd3f21026..afb0b2507 100644 --- a/testutil/database.go +++ b/testutil/database.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.2.1. DO NOT EDIT. package testutil diff --git a/testutil/operator.go b/testutil/operator.go index 4c2f490de..4f409d1fb 100644 --- a/testutil/operator.go +++ b/testutil/operator.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.2.1. DO NOT EDIT. package testutil diff --git a/testutil/operator_builder.go b/testutil/operator_builder.go new file mode 100644 index 000000000..0ca10b785 --- /dev/null +++ b/testutil/operator_builder.go @@ -0,0 +1,76 @@ +// Code generated by mockery v2.2.1. DO NOT EDIT. + +package testutil + +import ( + operator "github.com/observiq/stanza/operator" + mock "github.com/stretchr/testify/mock" +) + +// OperatorBuilder is an autogenerated mock type for the Builder type +type OperatorBuilder struct { + mock.Mock +} + +// Build provides a mock function with given fields: _a0 +func (_m *OperatorBuilder) Build(_a0 operator.BuildContext) (operator.Operator, error) { + ret := _m.Called(_a0) + + var r0 operator.Operator + if rf, ok := ret.Get(0).(func(operator.BuildContext) operator.Operator); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(operator.Operator) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(operator.BuildContext) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ID provides a mock function with given fields: +func (_m *OperatorBuilder) ID() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// SetNamespace provides a mock function with given fields: namespace, exclude +func (_m *OperatorBuilder) SetNamespace(namespace string, exclude ...string) { + _va := make([]interface{}, len(exclude)) + for _i := range exclude { + _va[_i] = exclude[_i] + } + var _ca []interface{} + _ca = append(_ca, namespace) + _ca = append(_ca, _va...) + _m.Called(_ca...) +} + +// Type provides a mock function with given fields: +func (_m *OperatorBuilder) Type() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} diff --git a/testutil/pipeline.go b/testutil/pipeline.go index 2152bd92f..c147bb215 100644 --- a/testutil/pipeline.go +++ b/testutil/pipeline.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.2.1. DO NOT EDIT. package testutil From 98ae4cf6dfe2587876144f3f2b7fafe51fb8b05c Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Mon, 7 Sep 2020 20:12:10 -0400 Subject: [PATCH 04/13] Fixed open database in offsets --- cmd/stanza/offsets.go | 6 +++--- cmd/stanza/offsets_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/stanza/offsets.go b/cmd/stanza/offsets.go index 1dc6fcec1..c80a6589b 100644 --- a/cmd/stanza/offsets.go +++ b/cmd/stanza/offsets.go @@ -5,7 +5,7 @@ import ( "io" "os" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/database" "github.com/observiq/stanza/operator/helper" "github.com/spf13/cobra" "go.etcd.io/bbolt" @@ -39,7 +39,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command { Short: "Clear persisted offsets from the database", Args: cobra.ArbitraryArgs, Run: func(command *cobra.Command, args []string) { - db, err := operator.OpenDatabase(rootFlags.DatabaseFile) + db, err := database.OpenDatabase(rootFlags.DatabaseFile) exitOnErr("Failed to open database", err) defer db.Close() defer db.Sync() @@ -90,7 +90,7 @@ func NewOffsetsListCmd(rootFlags *RootFlags) *cobra.Command { Short: "List operators with persisted offsets", Args: cobra.NoArgs, Run: func(command *cobra.Command, args []string) { - db, err := operator.OpenDatabase(rootFlags.DatabaseFile) + db, err := database.OpenDatabase(rootFlags.DatabaseFile) exitOnErr("Failed to open database", err) defer db.Close() diff --git a/cmd/stanza/offsets_test.go b/cmd/stanza/offsets_test.go index 86e344e0e..029257c34 100644 --- a/cmd/stanza/offsets_test.go +++ b/cmd/stanza/offsets_test.go @@ -7,7 +7,7 @@ import ( "path/filepath" "testing" - agent "github.com/observiq/stanza/agent" + "github.com/observiq/stanza/database" "github.com/observiq/stanza/operator/helper" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" @@ -27,7 +27,7 @@ func TestOffsets(t *testing.T) { stdout = buf // add an offset to the database - db, err := agent.OpenDatabase(databasePath) + db, err := database.OpenDatabase(databasePath) require.NoError(t, err) db.Update(func(tx *bbolt.Tx) error { bucket, err := tx.CreateBucketIfNotExists(helper.OffsetsBucket) From e19f879de6c43abfb19af0d354ab95ece0ccd1ab Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Mon, 7 Sep 2020 22:23:19 -0400 Subject: [PATCH 05/13] Moved operator.Duration to helper.Duration --- operator/buffer/buffer.go | 34 +++++++++---------- operator/buffer/buffer_test.go | 4 +-- operator/buffer/memory_buffer_test.go | 14 ++++---- operator/builtin/input/file/file.go | 16 ++++----- operator/builtin/input/file/file_test.go | 5 +-- operator/builtin/input/windows/operator.go | 12 +++---- .../output/googlecloud/google_cloud.go | 18 +++++----- .../output/googlecloud/google_cloud_test.go | 4 +-- .../k8smetadata/k8s_metadata_decorator.go | 12 +++---- .../transformer/ratelimit/rate_limit.go | 6 ++-- operator/{ => helper}/duration.go | 2 +- operator/{ => helper}/duration_test.go | 2 +- 12 files changed, 65 insertions(+), 64 deletions(-) rename operator/{ => helper}/duration.go (98%) rename operator/{ => helper}/duration_test.go (98%) diff --git a/operator/buffer/buffer.go b/operator/buffer/buffer.go index d4303c0fc..6f3429d98 100644 --- a/operator/buffer/buffer.go +++ b/operator/buffer/buffer.go @@ -7,7 +7,7 @@ import ( "github.com/observiq/stanza/entry" "github.com/observiq/stanza/errors" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" ) // Buffer is an entity that buffers log entries to an operator @@ -23,7 +23,7 @@ type Buffer interface { func NewConfig() Config { return Config{ BufferType: "memory", - DelayThreshold: operator.Duration{Duration: time.Second}, + DelayThreshold: helper.Duration{Duration: time.Second}, BundleCountThreshold: 10_000, BundleByteThreshold: 4 * 1024 * 1024 * 1024, // 4MB BundleByteLimit: 4 * 1024 * 1024 * 1024, // 4MB @@ -35,14 +35,14 @@ func NewConfig() Config { // Config is the configuration of a buffer type Config struct { - BufferType string `json:"type,omitempty" yaml:"type,omitempty"` - DelayThreshold operator.Duration `json:"delay_threshold,omitempty" yaml:"delay_threshold,omitempty"` - BundleCountThreshold int `json:"bundle_count_threshold,omitempty" yaml:"buffer_count_threshold,omitempty"` - BundleByteThreshold int `json:"bundle_byte_threshold,omitempty" yaml:"bundle_byte_threshold,omitempty"` - BundleByteLimit int `json:"bundle_byte_limit,omitempty" yaml:"bundle_byte_limit,omitempty"` - BufferedByteLimit int `json:"buffered_byte_limit,omitempty" yaml:"buffered_byte_limit,omitempty"` - HandlerLimit int `json:"handler_limit,omitempty" yaml:"handler_limit,omitempty"` - Retry RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"` + BufferType string `json:"type,omitempty" yaml:"type,omitempty"` + DelayThreshold helper.Duration `json:"delay_threshold,omitempty" yaml:"delay_threshold,omitempty"` + BundleCountThreshold int `json:"bundle_count_threshold,omitempty" yaml:"buffer_count_threshold,omitempty"` + BundleByteThreshold int `json:"bundle_byte_threshold,omitempty" yaml:"bundle_byte_threshold,omitempty"` + BundleByteLimit int `json:"bundle_byte_limit,omitempty" yaml:"bundle_byte_limit,omitempty"` + BufferedByteLimit int `json:"buffered_byte_limit,omitempty" yaml:"buffered_byte_limit,omitempty"` + HandlerLimit int `json:"handler_limit,omitempty" yaml:"handler_limit,omitempty"` + Retry RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"` } // Build will build a buffer from the supplied configuration @@ -61,18 +61,18 @@ func (config *Config) Build() (Buffer, error) { // NewRetryConfig creates a new retry config func NewRetryConfig() RetryConfig { return RetryConfig{ - InitialInterval: operator.Duration{Duration: 500 * time.Millisecond}, + InitialInterval: helper.Duration{Duration: 500 * time.Millisecond}, RandomizationFactor: 0.5, Multiplier: 1.5, - MaxInterval: operator.Duration{Duration: 15 * time.Minute}, + MaxInterval: helper.Duration{Duration: 15 * time.Minute}, } } // RetryConfig is the configuration of an entity that will retry processing after an error type RetryConfig struct { - InitialInterval operator.Duration `json:"initial_interval,omitempty" yaml:"initial_interval,omitempty"` - RandomizationFactor float64 `json:"randomization_factor,omitempty" yaml:"randomization_factor,omitempty"` - Multiplier float64 `json:"multiplier,omitempty" yaml:"multiplier,omitempty"` - MaxInterval operator.Duration `json:"max_interval,omitempty" yaml:"max_interval,omitempty"` - MaxElapsedTime operator.Duration `json:"max_elapsed_time,omitempty" yaml:"max_elapsed_time,omitempty"` + InitialInterval helper.Duration `json:"initial_interval,omitempty" yaml:"initial_interval,omitempty"` + RandomizationFactor float64 `json:"randomization_factor,omitempty" yaml:"randomization_factor,omitempty"` + Multiplier float64 `json:"multiplier,omitempty" yaml:"multiplier,omitempty"` + MaxInterval helper.Duration `json:"max_interval,omitempty" yaml:"max_interval,omitempty"` + MaxElapsedTime helper.Duration `json:"max_elapsed_time,omitempty" yaml:"max_elapsed_time,omitempty"` } diff --git a/operator/buffer/buffer_test.go b/operator/buffer/buffer_test.go index eb8028888..469001637 100644 --- a/operator/buffer/buffer_test.go +++ b/operator/buffer/buffer_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/observiq/stanza/entry" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" "github.com/stretchr/testify/require" "go.uber.org/zap" yaml "gopkg.in/yaml.v2" @@ -34,7 +34,7 @@ func (b *bufferHandler) Logger() *zap.SugaredLogger { func TestBuffer(t *testing.T) { config := NewConfig() - config.DelayThreshold = operator.Duration{ + config.DelayThreshold = helper.Duration{ Duration: 100 * time.Millisecond, } diff --git a/operator/buffer/memory_buffer_test.go b/operator/buffer/memory_buffer_test.go index 5f4194e63..2b3df97f3 100644 --- a/operator/buffer/memory_buffer_test.go +++ b/operator/buffer/memory_buffer_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/observiq/stanza/entry" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -47,7 +47,7 @@ func newMockHandler(t *testing.T) *mockHandler { func TestMemoryBufferRetry(t *testing.T) { t.Run("FailOnce", func(t *testing.T) { cfg := NewConfig() - cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Millisecond} + cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Millisecond} buffer, err := cfg.Build() require.NoError(t, err) handler := newMockHandler(t) @@ -68,7 +68,7 @@ func TestMemoryBufferRetry(t *testing.T) { t.Run("ContextCancelled", func(t *testing.T) { cfg := NewConfig() - cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Millisecond} + cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Millisecond} buffer, err := cfg.Build() require.NoError(t, err) handler := newMockHandler(t) @@ -95,8 +95,8 @@ func TestMemoryBufferRetry(t *testing.T) { t.Run("ExceededLimit", func(t *testing.T) { cfg := NewConfig() - cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Millisecond} - cfg.Retry.MaxElapsedTime = operator.Duration{Duration: time.Nanosecond} + cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Millisecond} + cfg.Retry.MaxElapsedTime = helper.Duration{Duration: time.Nanosecond} buffer, err := cfg.Build() require.NoError(t, err) handler := newMockHandler(t) @@ -124,7 +124,7 @@ func TestMemoryBufferRetry(t *testing.T) { func TestMemoryBufferFlush(t *testing.T) { t.Run("Simple", func(t *testing.T) { cfg := NewConfig() - cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Hour} + cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Hour} buffer, err := cfg.Build() require.NoError(t, err) handler := newMockHandler(t) @@ -156,7 +156,7 @@ func TestMemoryBufferFlush(t *testing.T) { t.Run("ContextCancelled", func(t *testing.T) { cfg := NewConfig() - cfg.DelayThreshold = operator.Duration{Duration: 10 * time.Hour} + cfg.DelayThreshold = helper.Duration{Duration: 10 * time.Hour} buffer, err := cfg.Build() require.NoError(t, err) handler := newMockHandler(t) diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index b0e94de2d..75ddb00dc 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -32,7 +32,7 @@ func init() { func NewInputConfig(operatorID string) *InputConfig { return &InputConfig{ InputConfig: helper.NewInputConfig(operatorID, "file_input"), - PollInterval: operator.Duration{Duration: 200 * time.Millisecond}, + PollInterval: helper.Duration{Duration: 200 * time.Millisecond}, IncludeFileName: true, IncludeFilePath: false, StartAt: "end", @@ -48,13 +48,13 @@ type InputConfig struct { Include []string `json:"include,omitempty" yaml:"include,omitempty"` Exclude []string `json:"exclude,omitempty" yaml:"exclude,omitempty"` - PollInterval operator.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` - Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"` - IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` - IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` - StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` - MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` - Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"` + PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"` + IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` + IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` + StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` + MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"` } // MultilineConfig is the configuration a multiline operation diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 6bac6c827..0ccdcd4fd 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -14,6 +14,7 @@ import ( "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -27,7 +28,7 @@ func newTestFileSource(t *testing.T) (*InputOperator, chan *entry.Entry) { }) cfg := NewInputConfig("testfile") - cfg.PollInterval = operator.Duration{Duration: 50 * time.Millisecond} + cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} cfg.StartAt = "beginning" cfg.Include = []string{"should-be-overwritten"} @@ -50,7 +51,7 @@ func TestFileSource_Build(t *testing.T) { cfg.OutputIDs = []string{"mock"} cfg.Include = []string{"/var/log/testpath.*"} cfg.Exclude = []string{"/var/log/testpath.ex*"} - cfg.PollInterval = operator.Duration{Duration: 10 * time.Millisecond} + cfg.PollInterval = helper.Duration{Duration: 10 * time.Millisecond} return cfg } diff --git a/operator/builtin/input/windows/operator.go b/operator/builtin/input/windows/operator.go index 69926f22a..388920d5f 100644 --- a/operator/builtin/input/windows/operator.go +++ b/operator/builtin/input/windows/operator.go @@ -19,10 +19,10 @@ func init() { // EventLogConfig is the configuration of a windows event log operator. type EventLogConfig struct { helper.InputConfig `yaml:",inline"` - Channel string `json:"channel" yaml:"channel"` - MaxReads int `json:"max_reads,omitempty" yaml:"max_reads,omitempty"` - StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` - PollInterval operator.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + Channel string `json:"channel" yaml:"channel"` + MaxReads int `json:"max_reads,omitempty" yaml:"max_reads,omitempty"` + StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` + PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` } // Build will build a windows event log operator. @@ -64,7 +64,7 @@ func NewDefaultConfig() operator.Builder { InputConfig: helper.NewInputConfig("", "windows_eventlog_input"), MaxReads: 100, StartAt: "end", - PollInterval: operator.Duration{ + PollInterval: helper.Duration{ Duration: 1 * time.Second, }, } @@ -79,7 +79,7 @@ type EventLogInput struct { channel string maxReads int startAt string - pollInterval operator.Duration + pollInterval helper.Duration offsets helper.Persister cancel context.CancelFunc wg *sync.WaitGroup diff --git a/operator/builtin/output/googlecloud/google_cloud.go b/operator/builtin/output/googlecloud/google_cloud.go index 7751a7ab9..ea6188f5c 100644 --- a/operator/builtin/output/googlecloud/google_cloud.go +++ b/operator/builtin/output/googlecloud/google_cloud.go @@ -33,7 +33,7 @@ func NewGoogleCloudOutputConfig(operatorID string) *GoogleCloudOutputConfig { return &GoogleCloudOutputConfig{ OutputConfig: helper.NewOutputConfig(operatorID, "google_cloud_output"), BufferConfig: buffer.NewConfig(), - Timeout: operator.Duration{Duration: 30 * time.Second}, + Timeout: helper.Duration{Duration: 30 * time.Second}, UseCompression: true, } } @@ -43,14 +43,14 @@ type GoogleCloudOutputConfig struct { helper.OutputConfig `yaml:",inline"` BufferConfig buffer.Config `json:"buffer,omitempty" yaml:"buffer,omitempty"` - Credentials string `json:"credentials,omitempty" yaml:"credentials,omitempty"` - CredentialsFile string `json:"credentials_file,omitempty" yaml:"credentials_file,omitempty"` - ProjectID string `json:"project_id" yaml:"project_id"` - LogNameField *entry.Field `json:"log_name_field,omitempty" yaml:"log_name_field,omitempty"` - TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"` - SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"` - Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` - UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"` + Credentials string `json:"credentials,omitempty" yaml:"credentials,omitempty"` + CredentialsFile string `json:"credentials_file,omitempty" yaml:"credentials_file,omitempty"` + ProjectID string `json:"project_id" yaml:"project_id"` + LogNameField *entry.Field `json:"log_name_field,omitempty" yaml:"log_name_field,omitempty"` + TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"` + SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"` + Timeout helper.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"` } // Build will build a google cloud output operator. diff --git a/operator/builtin/output/googlecloud/google_cloud_test.go b/operator/builtin/output/googlecloud/google_cloud_test.go index b2bdb6422..a6c947fc8 100644 --- a/operator/builtin/output/googlecloud/google_cloud_test.go +++ b/operator/builtin/output/googlecloud/google_cloud_test.go @@ -13,7 +13,7 @@ import ( "github.com/golang/protobuf/ptypes" tspb "github.com/golang/protobuf/ptypes/timestamp" "github.com/observiq/stanza/entry" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/require" "google.golang.org/api/option" @@ -33,7 +33,7 @@ type googleCloudTestCase struct { func googleCloudBasicConfig() *GoogleCloudOutputConfig { cfg := NewGoogleCloudOutputConfig("test_id") cfg.ProjectID = "test_project_id" - cfg.BufferConfig.DelayThreshold = operator.Duration{Duration: time.Millisecond} + cfg.BufferConfig.DelayThreshold = helper.Duration{Duration: time.Millisecond} return cfg } diff --git a/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go b/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go index 189220afc..76194fd31 100644 --- a/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go +++ b/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go @@ -24,18 +24,18 @@ func NewK8sMetadataDecoratorConfig(operatorID string) *K8sMetadataDecoratorConfi TransformerConfig: helper.NewTransformerConfig(operatorID, "k8s_metadata_decorator"), PodNameField: entry.NewResourceField("k8s.pod.name"), NamespaceField: entry.NewResourceField("k8s.namespace.name"), - CacheTTL: operator.Duration{Duration: 10 * time.Minute}, - Timeout: operator.Duration{Duration: 10 * time.Second}, + CacheTTL: helper.Duration{Duration: 10 * time.Minute}, + Timeout: helper.Duration{Duration: 10 * time.Second}, } } // K8sMetadataDecoratorConfig is the configuration of k8s_metadata_decorator operator type K8sMetadataDecoratorConfig struct { helper.TransformerConfig `yaml:",inline"` - PodNameField entry.Field `json:"pod_name_field,omitempty" yaml:"pod_name_field,omitempty"` - NamespaceField entry.Field `json:"namespace_field,omitempty" yaml:"namespace_field,omitempty"` - CacheTTL operator.Duration `json:"cache_ttl,omitempty" yaml:"cache_ttl,omitempty"` - Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + PodNameField entry.Field `json:"pod_name_field,omitempty" yaml:"pod_name_field,omitempty"` + NamespaceField entry.Field `json:"namespace_field,omitempty" yaml:"namespace_field,omitempty"` + CacheTTL helper.Duration `json:"cache_ttl,omitempty" yaml:"cache_ttl,omitempty"` + Timeout helper.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` } // Build will build a k8s_metadata_decorator operator from the supplied configuration diff --git a/operator/builtin/transformer/ratelimit/rate_limit.go b/operator/builtin/transformer/ratelimit/rate_limit.go index 42056096e..992da702e 100644 --- a/operator/builtin/transformer/ratelimit/rate_limit.go +++ b/operator/builtin/transformer/ratelimit/rate_limit.go @@ -25,9 +25,9 @@ func NewRateLimitConfig(operatorID string) *RateLimitConfig { type RateLimitConfig struct { helper.TransformerConfig `yaml:",inline"` - Rate float64 `json:"rate,omitempty" yaml:"rate,omitempty"` - Interval operator.Duration `json:"interval,omitempty" yaml:"interval,omitempty"` - Burst uint `json:"burst,omitempty" yaml:"burst,omitempty"` + Rate float64 `json:"rate,omitempty" yaml:"rate,omitempty"` + Interval helper.Duration `json:"interval,omitempty" yaml:"interval,omitempty"` + Burst uint `json:"burst,omitempty" yaml:"burst,omitempty"` } // Build will build a rate limit operator. diff --git a/operator/duration.go b/operator/helper/duration.go similarity index 98% rename from operator/duration.go rename to operator/helper/duration.go index e53d8dbbc..3cd5dd652 100644 --- a/operator/duration.go +++ b/operator/helper/duration.go @@ -1,4 +1,4 @@ -package operator +package helper import ( "encoding/json" diff --git a/operator/duration_test.go b/operator/helper/duration_test.go similarity index 98% rename from operator/duration_test.go rename to operator/helper/duration_test.go index 8c5c85d3d..9d2461283 100644 --- a/operator/duration_test.go +++ b/operator/helper/duration_test.go @@ -1,4 +1,4 @@ -package operator +package helper import ( "encoding/json" From 8c4ab22e5db088d040a1c6625afedc9a7d08e976 Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Mon, 7 Sep 2020 23:50:40 -0400 Subject: [PATCH 06/13] Increased test coverage for the plugin package --- agent/builder.go | 10 +- .../builtin/input/generate/generate_test.go | 5 +- operator/config.go | 7 +- pipeline/config.go | 11 +- pipeline/config_test.go | 35 ++--- .../parameter.go | 24 ++-- plugin/parameter_test.go | 126 ++++++++++++++++++ {operator => plugin}/plugin.go | 53 ++++---- {operator => plugin}/plugin_test.go | 59 +++++--- testutil/util.go | 5 +- 10 files changed, 242 insertions(+), 93 deletions(-) rename operator/plugin_parameter.go => plugin/parameter.go (85%) create mode 100644 plugin/parameter_test.go rename {operator => plugin}/plugin.go (72%) rename {operator => plugin}/plugin_test.go (90%) diff --git a/agent/builder.go b/agent/builder.go index c261580d2..a3b3636fb 100644 --- a/agent/builder.go +++ b/agent/builder.go @@ -4,6 +4,7 @@ import ( "github.com/observiq/stanza/database" "github.com/observiq/stanza/errors" "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/plugin" "go.uber.org/zap" ) @@ -49,18 +50,17 @@ func (b *LogAgentBuilder) Build() (*LogAgent, error) { return nil, errors.Wrap(err, "open database") } - registry, err := operator.NewPluginRegistry(b.pluginDir) + registry, err := plugin.NewPluginRegistry(b.pluginDir) if err != nil { return nil, errors.Wrap(err, "load plugin registry") } buildContext := operator.BuildContext{ - Logger: b.logger, - PluginRegistry: registry, - Database: db, + Logger: b.logger, + Database: db, } - pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput) + pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, registry, b.defaultOutput) if err != nil { return nil, err } diff --git a/operator/builtin/input/generate/generate_test.go b/operator/builtin/input/generate/generate_test.go index a36e8588c..5485f81eb 100644 --- a/operator/builtin/input/generate/generate_test.go +++ b/operator/builtin/input/generate/generate_test.go @@ -8,6 +8,7 @@ import ( "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/helper" + "github.com/observiq/stanza/plugin" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -66,7 +67,7 @@ pipeline: tmpl, err := template.New("my_generator").Parse(templateText) require.NoError(t, err) - registry := operator.PluginRegistry{ + registry := plugin.Registry{ "sample": tmpl, } @@ -76,7 +77,7 @@ pipeline: config, err := registry.Render("sample", params) require.NoError(t, err) - expectedConfig := operator.PluginConfig{ + expectedConfig := plugin.Plugin{ Pipeline: []operator.Config{ { Builder: &GenerateInputConfig{ diff --git a/operator/config.go b/operator/config.go index 74eb566dd..9ae23f53d 100644 --- a/operator/config.go +++ b/operator/config.go @@ -25,10 +25,9 @@ type Builder interface { // BuildContext supplies contextual resources when building an operator. type BuildContext struct { - PluginRegistry PluginRegistry - Database database.Database - Parameters map[string]interface{} - Logger *zap.SugaredLogger + Database database.Database + Parameters map[string]interface{} + Logger *zap.SugaredLogger } // registry is a global registry of operator types to operator builders. diff --git a/pipeline/config.go b/pipeline/config.go index b2e7c843e..73ed6f63d 100644 --- a/pipeline/config.go +++ b/pipeline/config.go @@ -7,6 +7,7 @@ import ( "github.com/observiq/stanza/errors" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/helper" + "github.com/observiq/stanza/plugin" yaml "gopkg.in/yaml.v2" ) @@ -14,8 +15,8 @@ import ( type Config []Params // BuildPipeline will build a pipeline from the config. -func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput operator.Operator) (*DirectedPipeline, error) { - operatorConfigs, err := c.buildOperatorConfigs(context.PluginRegistry) +func (c Config) BuildPipeline(context operator.BuildContext, pluginRegistry plugin.Registry, defaultOutput operator.Operator) (*DirectedPipeline, error) { + operatorConfigs, err := c.buildOperatorConfigs(pluginRegistry) if err != nil { return nil, err } @@ -37,7 +38,7 @@ func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput opera return pipeline, nil } -func (c Config) buildOperatorConfigs(pluginRegistry operator.PluginRegistry) ([]operator.Config, error) { +func (c Config) buildOperatorConfigs(pluginRegistry plugin.Registry) ([]operator.Config, error) { operatorConfigs := make([]operator.Config, 0, len(c)) for i, params := range c { @@ -194,7 +195,7 @@ func (p Params) getStringArray(key string) []string { } // BuildConfigs will build operator configs from a params map. -func (p Params) BuildConfigs(pluginRegistry operator.PluginRegistry, namespace string, defaultOutput []string) ([]operator.Config, error) { +func (p Params) BuildConfigs(pluginRegistry plugin.Registry, namespace string, defaultOutput []string) ([]operator.Config, error) { if operator.IsDefined(p.Type()) { return p.buildAsBuiltin(namespace) } @@ -232,7 +233,7 @@ func (p Params) buildAsBuiltin(namespace string) ([]operator.Config, error) { } // buildPlugin will build a plugin config from a params map. -func (p Params) buildPlugin(pluginRegistry operator.PluginRegistry, namespace string, defaultOutput []string) ([]operator.Config, error) { +func (p Params) buildPlugin(pluginRegistry plugin.Registry, namespace string, defaultOutput []string) ([]operator.Config, error) { templateParams := map[string]interface{}{} for key, value := range p { templateParams[key] = value diff --git a/pipeline/config_test.go b/pipeline/config_test.go index 1f93a591c..2daeda78a 100644 --- a/pipeline/config_test.go +++ b/pipeline/config_test.go @@ -9,6 +9,7 @@ import ( _ "github.com/observiq/stanza/operator/builtin/input/generate" "github.com/observiq/stanza/operator/builtin/output/drop" _ "github.com/observiq/stanza/operator/builtin/transformer/noop" + "github.com/observiq/stanza/plugin" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -201,7 +202,8 @@ pipeline: message: test output: {{.output}} ` - err := context.PluginRegistry.Add("plugin", pluginTemplate) + registry := plugin.Registry{} + err := registry.Add("plugin", pluginTemplate) require.NoError(t, err) pipelineConfig := Config{ @@ -216,7 +218,7 @@ pipeline: }, } - _, err = pipelineConfig.BuildPipeline(context, nil) + _, err = pipelineConfig.BuildPipeline(context, registry, nil) require.NoError(t, err) } @@ -239,7 +241,7 @@ func TestBuildValidPipelineDefaultOutput(t *testing.T) { defaultOutput, err := drop.NewDropOutputConfig("$.drop_it").Build(context) require.NoError(t, err) - pl, err := pipelineConfig.BuildPipeline(context, defaultOutput) + pl, err := pipelineConfig.BuildPipeline(context, nil, defaultOutput) require.NoError(t, err) require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.generate_input"), createNodeID("$.drop_it"))) } @@ -267,7 +269,7 @@ func TestBuildValidPipelineNextOutputAndDefaultOutput(t *testing.T) { defaultOutput, err := drop.NewDropOutputConfig("$.drop_it").Build(context) require.NoError(t, err) - pl, err := pipelineConfig.BuildPipeline(context, defaultOutput) + pl, err := pipelineConfig.BuildPipeline(context, nil, defaultOutput) require.NoError(t, err) require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.generate_input"), createNodeID("$.noop"))) require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.noop"), createNodeID("$.drop_it"))) @@ -284,7 +286,8 @@ pipeline: record: message: test ` - err := context.PluginRegistry.Add("plugin", pluginTemplate) + registry := plugin.Registry{} + err := registry.Add("plugin", pluginTemplate) require.NoError(t, err) pipelineConfig := Config{ @@ -297,7 +300,7 @@ pipeline: defaultOutput, err := drop.NewDropOutputConfig("$.drop_it").Build(context) require.NoError(t, err) - pl, err := pipelineConfig.BuildPipeline(context, defaultOutput) + pl, err := pipelineConfig.BuildPipeline(context, registry, defaultOutput) require.NoError(t, err) require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.plugin.plugin_generate"), createNodeID("$.drop_it"))) } @@ -317,7 +320,7 @@ func TestBuildInvalidPipelineInvalidType(t *testing.T) { }, } - _, err := pipelineConfig.BuildPipeline(context, nil) + _, err := pipelineConfig.BuildPipeline(context, nil, nil) require.Error(t, err) require.Contains(t, err.Error(), "unsupported `type` for operator config") } @@ -333,7 +336,8 @@ pipeline: message: test output: {{.output}} ` - err := context.PluginRegistry.Add("plugin", pluginTemplate) + registry := plugin.Registry{} + err := registry.Add("plugin", pluginTemplate) require.NoError(t, err) pipelineConfig := Config{ @@ -348,7 +352,7 @@ pipeline: }, } - _, err = pipelineConfig.BuildPipeline(context, nil) + _, err = pipelineConfig.BuildPipeline(context, registry, nil) require.Error(t, err) require.Contains(t, err.Error(), "build operator configs") } @@ -368,7 +372,7 @@ func TestBuildInvalidPipelineInvalidOperator(t *testing.T) { } context := testutil.NewBuildContext(t) - _, err := pipelineConfig.BuildPipeline(context, nil) + _, err := pipelineConfig.BuildPipeline(context, nil, nil) require.Error(t, err) require.Contains(t, err.Error(), "field number not found") } @@ -393,7 +397,7 @@ func TestBuildInvalidPipelineInvalidGraph(t *testing.T) { } context := testutil.NewBuildContext(t) - _, err := pipelineConfig.BuildPipeline(context, nil) + _, err := pipelineConfig.BuildPipeline(context, nil, nil) require.Error(t, err) require.Contains(t, err.Error(), "does not exist") } @@ -413,7 +417,8 @@ pipeline: record: test output: {{.output}} ` - err := context.PluginRegistry.Add("plugin", pluginTemplate) + registry := plugin.Registry{} + err := registry.Add("plugin", pluginTemplate) require.NoError(t, err) config := Config{ @@ -427,7 +432,7 @@ pipeline: }, } - configs, err := config.buildOperatorConfigs(context.PluginRegistry) + configs, err := config.buildOperatorConfigs(registry) require.NoError(t, err) require.Len(t, configs, 3) @@ -526,7 +531,7 @@ func TestBuildPipelineWithFailingOperator(t *testing.T) { config := Config{ {"type": "invalid_operator"}, } - _, err := config.BuildPipeline(ctx, nil) + _, err := config.BuildPipeline(ctx, nil, nil) require.Error(t, err) require.Contains(t, err.Error(), "failed to build operator") } @@ -536,7 +541,7 @@ func TestBuildPipelineWithInvalidParam(t *testing.T) { config := Config{ {"missing": "type"}, } - _, err := config.BuildPipeline(ctx, nil) + _, err := config.BuildPipeline(ctx, nil, nil) require.Error(t, err) require.Contains(t, err.Error(), "missing required `type` field") } diff --git a/operator/plugin_parameter.go b/plugin/parameter.go similarity index 85% rename from operator/plugin_parameter.go rename to plugin/parameter.go index 0481ce39b..ce5106d20 100644 --- a/operator/plugin_parameter.go +++ b/plugin/parameter.go @@ -1,4 +1,4 @@ -package operator +package plugin import ( "fmt" @@ -6,8 +6,8 @@ import ( "github.com/observiq/stanza/errors" ) -// PluginParameter is a basic description of a plugin's parameter. -type PluginParameter struct { +// Parameter is a basic description of a plugin's parameter. +type Parameter struct { Label string Description string Required bool @@ -16,7 +16,7 @@ type PluginParameter struct { Default interface{} // Must be valid according to Type & ValidValues } -func (param PluginParameter) validate() error { +func (param Parameter) validate() error { if param.Required && param.Default != nil { return errors.NewError( "required parameter cannot have a default value", @@ -39,7 +39,7 @@ func (param PluginParameter) validate() error { return nil } -func (param PluginParameter) validateType() error { +func (param Parameter) validateType() error { switch param.Type { case "string", "int", "bool", "strings", "enum": // ok default: @@ -51,7 +51,7 @@ func (param PluginParameter) validateType() error { return nil } -func (param PluginParameter) validateValidValues() error { +func (param Parameter) validateValidValues() error { switch param.Type { case "string", "int", "bool", "strings": if len(param.ValidValues) > 0 { @@ -71,7 +71,7 @@ func (param PluginParameter) validateValidValues() error { return nil } -func (param PluginParameter) validateDefault() error { +func (param Parameter) validateDefault() error { if param.Default == nil { return nil } @@ -96,7 +96,7 @@ func (param PluginParameter) validateDefault() error { } } -func validateStringDefault(param PluginParameter) error { +func validateStringDefault(param Parameter) error { if _, ok := param.Default.(string); !ok { return errors.NewError( "default value for a parameter of type 'string' must be a string", @@ -106,7 +106,7 @@ func validateStringDefault(param PluginParameter) error { return nil } -func validateIntDefault(param PluginParameter) error { +func validateIntDefault(param Parameter) error { switch param.Default.(type) { case int, int32, int64: return nil @@ -118,7 +118,7 @@ func validateIntDefault(param PluginParameter) error { } } -func validateBoolDefault(param PluginParameter) error { +func validateBoolDefault(param Parameter) error { if _, ok := param.Default.(bool); !ok { return errors.NewError( "default value for a parameter of type 'bool' must be a boolean", @@ -128,7 +128,7 @@ func validateBoolDefault(param PluginParameter) error { return nil } -func validateStringArrayDefault(param PluginParameter) error { +func validateStringArrayDefault(param Parameter) error { defaultList, ok := param.Default.([]interface{}) if !ok { return errors.NewError( @@ -147,7 +147,7 @@ func validateStringArrayDefault(param PluginParameter) error { return nil } -func validateEnumDefault(param PluginParameter) error { +func validateEnumDefault(param Parameter) error { def, ok := param.Default.(string) if !ok { return errors.NewError( diff --git a/plugin/parameter_test.go b/plugin/parameter_test.go new file mode 100644 index 000000000..723e4de24 --- /dev/null +++ b/plugin/parameter_test.go @@ -0,0 +1,126 @@ +package plugin + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValidateDefault(t *testing.T) { + testCases := []struct { + name string + expectErr bool + param Parameter + }{ + { + "ValidStringDefault", + false, + Parameter{ + Type: "string", + Default: "test", + }, + }, + { + "InvalidStringDefault", + true, + Parameter{ + Type: "string", + Default: 5, + }, + }, + { + "ValidIntDefault", + false, + Parameter{ + Type: "int", + Default: 5, + }, + }, + { + "InvalidStringDefault", + true, + Parameter{ + Type: "int", + Default: "test", + }, + }, + { + "ValidBoolDefault", + false, + Parameter{ + Type: "bool", + Default: true, + }, + }, + { + "InvalidBoolDefault", + true, + Parameter{ + Type: "bool", + Default: "test", + }, + }, + { + "ValidStringsDefault", + false, + Parameter{ + Type: "strings", + Default: []interface{}{"test"}, + }, + }, + { + "InvalidStringsDefault", + true, + Parameter{ + Type: "strings", + Default: []interface{}{5}, + }, + }, + { + "ValidEnumDefault", + false, + Parameter{ + Type: "enum", + ValidValues: []string{"test"}, + Default: "test", + }, + }, + { + "InvalidEnumDefault", + true, + Parameter{ + Type: "enum", + ValidValues: []string{"test"}, + Default: "invalid", + }, + }, + { + "NonStringEnumDefault", + true, + Parameter{ + Type: "enum", + ValidValues: []string{"test"}, + Default: 5, + }, + }, + { + "InvalidTypeDefault", + true, + Parameter{ + Type: "float", + Default: 5, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.param.validateDefault() + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/operator/plugin.go b/plugin/plugin.go similarity index 72% rename from operator/plugin.go rename to plugin/plugin.go index ea14a73e9..53b7c2257 100644 --- a/operator/plugin.go +++ b/plugin/plugin.go @@ -1,4 +1,4 @@ -package operator +package plugin import ( "bytes" @@ -9,26 +9,27 @@ import ( "text/template" "github.com/observiq/stanza/errors" + "github.com/observiq/stanza/operator" yaml "gopkg.in/yaml.v2" ) -// PluginConfig is the rendered config of a plugin. -type PluginConfig struct { +// Plugin is the rendered result of a plugin template. +type Plugin struct { Version string Title string Description string - Parameters map[string]PluginParameter - Pipeline []Config + Parameters map[string]Parameter + Pipeline []operator.Config } -// PluginRegistry is a registry of plugin templates. -type PluginRegistry map[string]*template.Template +// Registry is a registry of plugin templates. +type Registry map[string]*template.Template -// Render will render a plugin config using the params and plugin type. -func (r PluginRegistry) Render(pluginType string, params map[string]interface{}) (PluginConfig, error) { +// Render will render a plugin using the params and plugin type. +func (r Registry) Render(pluginType string, params map[string]interface{}) (Plugin, error) { template, ok := r[pluginType] if !ok { - return PluginConfig{}, errors.NewError( + return Plugin{}, errors.NewError( "plugin type does not exist", "ensure that all plugins are defined with a registered type", "plugin_type", pluginType, @@ -37,7 +38,7 @@ func (r PluginRegistry) Render(pluginType string, params map[string]interface{}) var writer bytes.Buffer if err := template.Execute(&writer, params); err != nil { - return PluginConfig{}, errors.NewError( + return Plugin{}, errors.NewError( "failed to render template for plugin", "ensure that all parameters are valid for the plugin", "plugin_type", pluginType, @@ -45,10 +46,10 @@ func (r PluginRegistry) Render(pluginType string, params map[string]interface{}) ) } - var config PluginConfig - if err := yaml.UnmarshalStrict(writer.Bytes(), &config); err != nil { - return PluginConfig{}, errors.NewError( - "failed to unmarshal plugin template to plugin config", + var plugin Plugin + if err := yaml.UnmarshalStrict(writer.Bytes(), &plugin); err != nil { + return Plugin{}, errors.NewError( + "failed to unmarshal plugin template to plugin", "ensure that the plugin template renders a valid pipeline", "plugin_type", pluginType, "rendered_config", writer.String(), @@ -56,10 +57,10 @@ func (r PluginRegistry) Render(pluginType string, params map[string]interface{}) ) } - for name, param := range config.Parameters { + for name, param := range plugin.Parameters { if err := param.validate(); err != nil { - return PluginConfig{}, errors.NewError( - "invalid parameter found in plugin config", + return Plugin{}, errors.NewError( + "invalid parameter found in plugin", "ensure that all parameters are valid for the plugin", "plugin_type", pluginType, "plugin_parameter", name, @@ -69,17 +70,17 @@ func (r PluginRegistry) Render(pluginType string, params map[string]interface{}) } } - return config, nil + return plugin, nil } // IsDefined returns a boolean indicating if a plugin is defined and registered. -func (r PluginRegistry) IsDefined(pluginType string) bool { +func (r Registry) IsDefined(pluginType string) bool { _, ok := r[pluginType] return ok } // LoadAll will load all plugin templates contained in a directory. -func (r PluginRegistry) LoadAll(dir string, pattern string) error { +func (r Registry) LoadAll(dir string, pattern string) error { glob := filepath.Join(dir, pattern) filePaths, err := filepath.Glob(glob) if err != nil { @@ -109,7 +110,7 @@ func (r PluginRegistry) LoadAll(dir string, pattern string) error { } // Load will load a plugin template from a file path. -func (r PluginRegistry) Load(path string) error { +func (r Registry) Load(path string) error { fileName := filepath.Base(path) pluginType := strings.TrimSuffix(fileName, filepath.Ext(fileName)) @@ -122,8 +123,8 @@ func (r PluginRegistry) Load(path string) error { } // Add will add a plugin to the registry. -func (r PluginRegistry) Add(pluginType string, contents string) error { - if IsDefined(pluginType) { +func (r Registry) Add(pluginType string, contents string) error { + if operator.IsDefined(pluginType) { return fmt.Errorf("plugin type %s already exists as a builtin plugin", pluginType) } @@ -137,8 +138,8 @@ func (r PluginRegistry) Add(pluginType string, contents string) error { } // NewPluginRegistry creates a new plugin registry from a plugin directory. -func NewPluginRegistry(dir string) (PluginRegistry, error) { - registry := PluginRegistry{} +func NewPluginRegistry(dir string) (Registry, error) { + registry := Registry{} if err := registry.LoadAll(dir, "*.yaml"); err != nil { return registry, err } diff --git a/operator/plugin_test.go b/plugin/plugin_test.go similarity index 90% rename from operator/plugin_test.go rename to plugin/plugin_test.go index a0b39bea2..3c85dbfee 100644 --- a/operator/plugin_test.go +++ b/plugin/plugin_test.go @@ -1,4 +1,4 @@ -package operator +package plugin import ( "io/ioutil" @@ -7,6 +7,7 @@ import ( "testing" "text/template" + "github.com/observiq/stanza/operator" "github.com/stretchr/testify/require" ) @@ -23,12 +24,8 @@ func NewTempDir(t *testing.T) string { return tempDir } -func TestPluginRegistry_LoadAll(t *testing.T) { - tempDir, err := ioutil.TempDir("", "") - require.NoError(t, err) - t.Cleanup(func() { - os.RemoveAll(tempDir) - }) +func TestNewRegistry(t *testing.T) { + tempDir := NewTempDir(t) test1 := []byte(` id: my_generator @@ -46,21 +43,31 @@ record: message2: {{ .message }} `) - err = ioutil.WriteFile(filepath.Join(tempDir, "test1.yaml"), test1, 0666) + err := ioutil.WriteFile(filepath.Join(tempDir, "test1.yaml"), test1, 0666) require.NoError(t, err) err = ioutil.WriteFile(filepath.Join(tempDir, "test2.yaml"), test2, 0666) require.NoError(t, err) - pluginRegistry := PluginRegistry{} - err = pluginRegistry.LoadAll(tempDir, "*.yaml") + registry, err := NewPluginRegistry(tempDir) require.NoError(t, err) - require.Equal(t, 2, len(pluginRegistry)) + require.Equal(t, 2, len(registry)) + require.True(t, registry.IsDefined("test1")) + require.True(t, registry.IsDefined("test2")) } -func TestPluginRegistryRender(t *testing.T) { +func TestNewRegistryFailure(t *testing.T) { + tempDir := NewTempDir(t) + err := ioutil.WriteFile(filepath.Join(tempDir, "invalid.yaml"), []byte("pipeline:"), 0111) + require.NoError(t, err) + + _, err = NewPluginRegistry(tempDir) + require.Error(t, err) +} + +func TestRegistryRender(t *testing.T) { t.Run("ErrorTypeDoesNotExist", func(t *testing.T) { - reg := PluginRegistry{} + reg := Registry{} _, err := reg.Render("unknown", map[string]interface{}{}) require.Error(t, err) require.Contains(t, err.Error(), "does not exist") @@ -70,7 +77,7 @@ func TestPluginRegistryRender(t *testing.T) { tmpl, err := template.New("plugintype").Parse(`{{ .panicker }}`) require.NoError(t, err) - reg := PluginRegistry{ + reg := Registry{ "plugintype": tmpl, } params := map[string]interface{}{ @@ -83,24 +90,24 @@ func TestPluginRegistryRender(t *testing.T) { }) } -func TestPluginRegistryLoad(t *testing.T) { +func TestRegistryLoad(t *testing.T) { t.Run("LoadAllBadGlob", func(t *testing.T) { - reg := PluginRegistry{} + reg := Registry{} err := reg.LoadAll("", `[]`) require.Error(t, err) require.Contains(t, err.Error(), "with glob pattern") }) t.Run("AddDuplicate", func(t *testing.T) { - reg := PluginRegistry{} - Register("copy", func() Builder { return nil }) + reg := Registry{} + operator.Register("copy", func() operator.Builder { return nil }) err := reg.Add("copy", "pipeline:\n") require.Error(t, err) require.Contains(t, err.Error(), "already exists") }) t.Run("AddBadTemplate", func(t *testing.T) { - reg := PluginRegistry{} + reg := Registry{} err := reg.Add("new", "{{ nofunc }") require.Error(t, err) require.Contains(t, err.Error(), "as a plugin template") @@ -112,7 +119,7 @@ func TestPluginRegistryLoad(t *testing.T) { err := ioutil.WriteFile(pluginPath, []byte("pipeline:\n"), 0755) require.NoError(t, err) - reg := PluginRegistry{} + reg := Registry{} err = reg.LoadAll(tempDir, "*.yaml") require.Error(t, err) }) @@ -615,7 +622,7 @@ pipeline: for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - reg := PluginRegistry{} + reg := Registry{} err := reg.Add(tc.name, tc.template) require.NoError(t, err) _, err = reg.Render(tc.name, map[string]interface{}{}) @@ -627,3 +634,13 @@ pipeline: }) } } + +func TestDefaultPluginFuncWithValue(t *testing.T) { + result := defaultPluginFunc("default_value", "supplied_value") + require.Equal(t, "supplied_value", result) +} + +func TestDefaultPluginFuncWithoutValue(t *testing.T) { + result := defaultPluginFunc("default_value", nil) + require.Equal(t, "default_value", result) +} diff --git a/testutil/util.go b/testutil/util.go index fdabbbf5a..2754e14bd 100644 --- a/testutil/util.go +++ b/testutil/util.go @@ -55,9 +55,8 @@ func NewTestDatabase(t testing.TB) *bbolt.DB { // NewBuildContext will return a new build context for testing func NewBuildContext(t testing.TB) operator.BuildContext { return operator.BuildContext{ - PluginRegistry: make(operator.PluginRegistry), - Database: NewTestDatabase(t), - Logger: zaptest.NewLogger(t).Sugar(), + Database: NewTestDatabase(t), + Logger: zaptest.NewLogger(t).Sugar(), } } From f545327b730c75409bbd20fcaeec8498c83799ac Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Tue, 8 Sep 2020 09:47:33 -0400 Subject: [PATCH 07/13] Fixed graph command --- cmd/stanza/graph.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cmd/stanza/graph.go b/cmd/stanza/graph.go index fbae1cf19..38298acff 100644 --- a/cmd/stanza/graph.go +++ b/cmd/stanza/graph.go @@ -4,8 +4,8 @@ import ( "os" "github.com/observiq/stanza/agent" - "github.com/observiq/stanza/operator" pg "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/plugin" "github.com/spf13/cobra" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -43,17 +43,16 @@ func runGraph(_ *cobra.Command, _ []string, flags *RootFlags) { os.Exit(1) } - pluginRegistry, err := operator.NewPluginRegistry(flags.PluginDir) + pluginRegistry, err := plugin.NewPluginRegistry(flags.PluginDir) if err != nil { logger.Errorw("Failed to load plugin registry", zap.Any("error", err)) } buildContext := pg.BuildContext{ - PluginRegistry: pluginRegistry, - Logger: logger, + Logger: logger, } - pipeline, err := cfg.Pipeline.BuildPipeline(buildContext, nil) + pipeline, err := cfg.Pipeline.BuildPipeline(buildContext, pluginRegistry, nil) if err != nil { logger.Errorw("Failed to build operator pipeline", zap.Any("error", err)) os.Exit(1) From dde4b71f4910638a8cd60a4a75ec8290939324ad Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Tue, 8 Sep 2020 11:11:44 -0400 Subject: [PATCH 08/13] Improved JSON test coverage --- operator/builtin/parser/json/json_test.go | 83 +++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/operator/builtin/parser/json/json_test.go b/operator/builtin/parser/json/json_test.go index 1b19b2967..322e6bb2c 100644 --- a/operator/builtin/parser/json/json_test.go +++ b/operator/builtin/parser/json/json_test.go @@ -2,6 +2,7 @@ package json import ( "context" + "fmt" "testing" "time" @@ -16,6 +17,88 @@ import ( "go.uber.org/zap" ) +func NewTestConfig(t *testing.T) (*operator.Config, error) { + json := `{ + "type": "json_parser", + "id": "test_id", + "output": "test_output" + }` + config := &operator.Config{} + err := config.UnmarshalJSON([]byte(json)) + return config, err +} + +func NewTestParser(t *testing.T) (*JSONParser, error) { + config, err := NewTestConfig(t) + if err != nil { + return nil, err + } + + ctx := testutil.NewBuildContext(t) + op, err := config.Build(ctx) + if err != nil { + return nil, err + } + + parser, ok := op.(*JSONParser) + if !ok { + return nil, fmt.Errorf("operator is not a json parser") + } + + return parser, nil +} + +func TestJSONParserConfigBuild(t *testing.T) { + config, err := NewTestConfig(t) + require.NoError(t, err) + + ctx := testutil.NewBuildContext(t) + parser, err := config.Build(ctx) + require.NoError(t, err) + require.IsType(t, &JSONParser{}, parser) +} + +func TestJSONParserConfigBuildFailure(t *testing.T) { + config, err := NewTestConfig(t) + require.NoError(t, err) + + parserConfig, ok := config.Builder.(*JSONParserConfig) + require.True(t, ok) + + parserConfig.OnError = "invalid_on_error" + ctx := testutil.NewBuildContext(t) + _, err = config.Build(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid `on_error` field") +} + +func TestJSONParserStringFailure(t *testing.T) { + parser, err := NewTestParser(t) + require.NoError(t, err) + + _, err = parser.parse("invalid") + require.Error(t, err) + require.Contains(t, err.Error(), "error found in #1 byte") +} + +func TestJSONParserByteFailure(t *testing.T) { + parser, err := NewTestParser(t) + require.NoError(t, err) + + _, err = parser.parse([]byte("invalid")) + require.Error(t, err) + require.Contains(t, err.Error(), "error found in #1 byte") +} + +func TestJSONParserInvalidType(t *testing.T) { + parser, err := NewTestParser(t) + require.NoError(t, err) + + _, err = parser.parse([]int{}) + require.Error(t, err) + require.Contains(t, err.Error(), "type []int cannot be parsed as JSON") +} + func NewFakeJSONOperator() (*JSONParser, *testutil.Operator) { mock := testutil.Operator{} logger, _ := zap.NewProduction() From b474bf2f1454acf85cd66bbc65cb7c7fa14bd201 Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Tue, 8 Sep 2020 12:42:07 -0400 Subject: [PATCH 09/13] Improved regex parser test coverage --- operator/builtin/parser/regex/regex_test.go | 75 +++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/operator/builtin/parser/regex/regex_test.go b/operator/builtin/parser/regex/regex_test.go index 0fa02f4d5..eec17b799 100644 --- a/operator/builtin/parser/regex/regex_test.go +++ b/operator/builtin/parser/regex/regex_test.go @@ -2,6 +2,7 @@ package regex import ( "context" + "fmt" "regexp" "testing" @@ -13,6 +14,80 @@ import ( "github.com/stretchr/testify/require" ) +func NewTestConfig(t *testing.T, regex string) (*operator.Config, error) { + json := `{ + "type": "regex_parser", + "id": "test_id", + "regex": "%s", + "output": "test_output" + }` + json = fmt.Sprintf(json, regex) + config := &operator.Config{} + err := config.UnmarshalJSON([]byte(json)) + return config, err +} + +func NewTestParser(t *testing.T, regex string) (*RegexParser, error) { + config, err := NewTestConfig(t, regex) + if err != nil { + return nil, err + } + + ctx := testutil.NewBuildContext(t) + op, err := config.Build(ctx) + if err != nil { + return nil, err + } + + parser, ok := op.(*RegexParser) + if !ok { + return nil, fmt.Errorf("operator is not a regex parser") + } + + return parser, nil +} + +func TestRegexParserBuildFailure(t *testing.T) { + config, err := NewTestConfig(t, "^(?Ptest)") + require.NoError(t, err) + + parserConfig, ok := config.Builder.(*RegexParserConfig) + require.True(t, ok) + + parserConfig.OnError = "invalid_on_error" + ctx := testutil.NewBuildContext(t) + _, err = config.Build(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid `on_error` field") +} + +func TestRegexParserStringFailure(t *testing.T) { + parser, err := NewTestParser(t, "^(?Ptest)") + require.NoError(t, err) + + _, err = parser.parse("invalid") + require.Error(t, err) + require.Contains(t, err.Error(), "regex pattern does not match") +} + +func TestRegexParserByteFailure(t *testing.T) { + parser, err := NewTestParser(t, "^(?Ptest)") + require.NoError(t, err) + + _, err = parser.parse([]byte("invalid")) + require.Error(t, err) + require.Contains(t, err.Error(), "regex pattern does not match") +} + +func TestRegexParserInvalidType(t *testing.T) { + parser, err := NewTestParser(t, "^(?Ptest)") + require.NoError(t, err) + + _, err = parser.parse([]int{}) + require.Error(t, err) + require.Contains(t, err.Error(), "type '[]int' cannot be parsed as regex") +} + func newFakeRegexParser() (*RegexParser, *testutil.Operator) { mockOperator := testutil.Operator{} return &RegexParser{ From 3d7e1d7e241cc7698be8857786a62a0e0d694957 Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Tue, 8 Sep 2020 13:10:02 -0400 Subject: [PATCH 10/13] Added tests to persister --- operator/helper/persister_test.go | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 operator/helper/persister_test.go diff --git a/operator/helper/persister_test.go b/operator/helper/persister_test.go new file mode 100644 index 000000000..85eb44fc9 --- /dev/null +++ b/operator/helper/persister_test.go @@ -0,0 +1,35 @@ +package helper + +import ( + "path/filepath" + "testing" + + "github.com/observiq/stanza/database" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" +) + +func TestPersisterCache(t *testing.T) { + stubDatabase := database.NewStubDatabase() + persister := NewScopedDBPersister(stubDatabase, "test") + persister.Set("key", []byte("value")) + value := persister.Get("key") + require.Equal(t, []byte("value"), value) +} + +func TestPersisterLoad(t *testing.T) { + tempDir := testutil.NewTempDir(t) + db, err := database.OpenDatabase(filepath.Join(tempDir, "test.db")) + persister := NewScopedDBPersister(db, "test") + persister.Set("key", []byte("value")) + + err = persister.Sync() + require.NoError(t, err) + + newPersister := NewScopedDBPersister(db, "test") + err = newPersister.Load() + require.NoError(t, err) + + value := newPersister.Get("key") + require.Equal(t, []byte("value"), value) +} From 7bc72c563f5679b98e460b29b3855d09ae1e6a6a Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Tue, 8 Sep 2020 15:12:11 -0400 Subject: [PATCH 11/13] Improved operator package test coverage --- operator/config_test.go | 45 ++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/operator/config_test.go b/operator/config_test.go index a3f5c0922..315a56288 100644 --- a/operator/config_test.go +++ b/operator/config_test.go @@ -4,27 +4,10 @@ import ( "encoding/json" "testing" - "github.com/observiq/stanza/database" "github.com/stretchr/testify/require" yaml "gopkg.in/yaml.v2" ) -func TestStubDatabase(t *testing.T) { - stub := &database.StubDatabase{} - - err := stub.Close() - require.NoError(t, err) - - err = stub.Sync() - require.NoError(t, err) - - err = stub.Update(nil) - require.NoError(t, err) - - err = stub.View(nil) - require.NoError(t, err) -} - type FakeBuilder struct { OperatorID string `json:"id" yaml:"id"` OperatorType string `json:"type" yaml:"type"` @@ -37,10 +20,19 @@ func (f *FakeBuilder) ID() string { return "pl func (f *FakeBuilder) Type() string { return "plugin" } func TestUnmarshalJSONErrors(t *testing.T) { + t.Run("ValidJSON", func(t *testing.T) { + Register("fake_operator", func() Builder { return &FakeBuilder{} }) + raw := `{"type":"fake_operator"}` + cfg := &Config{} + err := cfg.UnmarshalJSON([]byte(raw)) + require.NoError(t, err) + require.IsType(t, &FakeBuilder{}, cfg.Builder) + }) + t.Run("InvalidJSON", func(t *testing.T) { raw := `{}}` - var cfg Config - err := json.Unmarshal([]byte(raw), &cfg) + cfg := &Config{} + err := cfg.UnmarshalJSON([]byte(raw)) require.Error(t, err) require.Contains(t, err.Error(), "invalid") }) @@ -86,6 +78,15 @@ func TestMarshalJSON(t *testing.T) { } func TestUnmarshalYAMLErrors(t *testing.T) { + t.Run("ValidYAML", func(t *testing.T) { + Register("fake_operator", func() Builder { return &FakeBuilder{} }) + raw := `type: fake_operator` + var cfg Config + err := yaml.Unmarshal([]byte(raw), &cfg) + require.NoError(t, err) + require.IsType(t, &FakeBuilder{}, cfg.Builder) + }) + t.Run("InvalidYAML", func(t *testing.T) { raw := `-- - \n||\\` var cfg Config @@ -141,3 +142,9 @@ func TestMarshalYAML(t *testing.T) { expected := "id: plugin\ntype: plugin\narray:\n- test\n" require.Equal(t, expected, string(out)) } + +func TestIsDefined(t *testing.T) { + Register("fake_operator", func() Builder { return &FakeBuilder{} }) + require.True(t, IsDefined("fake_operator")) +} + From 6bc68fd660974eb37714e3fd0b875275b76a6ea2 Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Tue, 8 Sep 2020 15:25:50 -0400 Subject: [PATCH 12/13] Improved drop_output test coverage --- operator/builtin/output/drop/drop_test.go | 75 ++++++++++++++++++++--- 1 file changed, 66 insertions(+), 9 deletions(-) diff --git a/operator/builtin/output/drop/drop_test.go b/operator/builtin/output/drop/drop_test.go index f40ce8f89..c42b00a78 100644 --- a/operator/builtin/output/drop/drop_test.go +++ b/operator/builtin/output/drop/drop_test.go @@ -1,16 +1,73 @@ package drop import ( - "github.com/observiq/stanza/operator/helper" + "context" + "fmt" + "testing" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" ) -func newFakeNullOutput() *DropOutput { - return &DropOutput{ - OutputOperator: helper.OutputOperator{ - BasicOperator: helper.BasicOperator{ - OperatorID: "testnull", - OperatorType: "drop_output", - }, - }, +func NewTestConfig(t *testing.T) (*operator.Config, error) { + json := `{ + "type": "drop_output", + "id": "test_id" + }` + config := &operator.Config{} + err := config.UnmarshalJSON([]byte(json)) + return config, err +} + +func NewTestOutput(t *testing.T) (*DropOutput, error) { + config, err := NewTestConfig(t) + if err != nil { + return nil, err + } + + ctx := testutil.NewBuildContext(t) + op, err := config.Build(ctx) + if err != nil { + return nil, err + } + + output, ok := op.(*DropOutput) + if !ok { + return nil, fmt.Errorf("operator is not a drop output") } + + return output, nil +} + +func TestBuildValid(t *testing.T) { + cfg, err := NewTestConfig(t) + require.NoError(t, err) + + ctx := testutil.NewBuildContext(t) + output, err := cfg.Build(ctx) + require.NoError(t, err) + require.IsType(t, &DropOutput{}, output) +} + +func TestBuildIvalid(t *testing.T) { + cfg, err := NewTestConfig(t) + require.NoError(t, err) + + ctx := testutil.NewBuildContext(t) + ctx.Logger = nil + _, err = cfg.Build(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "build context is missing a logger") +} + +func TestProcess(t *testing.T) { + output, err := NewTestOutput(t) + require.NoError(t, err) + + entry := entry.New() + ctx := context.Background() + result := output.Process(ctx, entry) + require.Nil(t, result) } From 6b6e34baf33a2116cf1f47f1497afed5e8a03cf1 Mon Sep 17 00:00:00 2001 From: Joshua Williams Date: Tue, 8 Sep 2020 15:40:03 -0400 Subject: [PATCH 13/13] Improved noop operator test coverage --- .../builtin/transformer/noop/noop_test.go | 77 ++++++++++++++++++- 1 file changed, 73 insertions(+), 4 deletions(-) diff --git a/operator/builtin/transformer/noop/noop_test.go b/operator/builtin/transformer/noop/noop_test.go index a3d7a4c4f..afc9cceb3 100644 --- a/operator/builtin/transformer/noop/noop_test.go +++ b/operator/builtin/transformer/noop/noop_test.go @@ -1,16 +1,85 @@ package noop import ( + "context" + "fmt" "testing" + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func TestNoopOperatorBuild(t *testing.T) { - cfg := NewNoopOperatorConfig("test_operator_id") - cfg.OutputIDs = []string{"output"} +func NewTestConfig(t *testing.T) (*operator.Config, error) { + json := `{ + "type": "noop", + "id": "test_id", + "output": "test_output" + }` + config := &operator.Config{} + err := config.UnmarshalJSON([]byte(json)) + return config, err +} + +func NewTestOperator(t *testing.T) (*NoopOperator, error) { + config, err := NewTestConfig(t) + if err != nil { + return nil, err + } + + ctx := testutil.NewBuildContext(t) + op, err := config.Build(ctx) + if err != nil { + return nil, err + } + + noop, ok := op.(*NoopOperator) + if !ok { + return nil, fmt.Errorf("operator is not a json parser") + } + + return noop, nil +} + +func TestBuildValid(t *testing.T) { + cfg, err := NewTestConfig(t) + require.NoError(t, err) + + ctx := testutil.NewBuildContext(t) + output, err := cfg.Build(ctx) + require.NoError(t, err) + require.IsType(t, &NoopOperator{}, output) +} - _, err := cfg.Build(testutil.NewBuildContext(t)) +func TestBuildIvalid(t *testing.T) { + cfg, err := NewTestConfig(t) require.NoError(t, err) + + ctx := testutil.NewBuildContext(t) + ctx.Logger = nil + _, err = cfg.Build(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "build context is missing a logger") +} + +func TestProcess(t *testing.T) { + noop, err := NewTestOperator(t) + require.NoError(t, err) + + var processedEntry interface{} + mockOutput := testutil.NewMockOperator("test_output") + mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { processedEntry = args[1] }).Return(nil) + noop.OutputOperators = []operator.Operator{mockOutput} + + entry := entry.New() + entry.AddLabel("label", "value") + entry.AddResourceKey("resource", "value") + + expected := entry.Copy() + ctx := context.Background() + result := noop.Process(ctx, entry) + require.Nil(t, result) + require.Equal(t, expected, processedEntry) }