diff --git a/agent/agent.go b/agent/agent.go index 5f243c62b..6ca296814 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1,23 +1,17 @@ package agent import ( - "fmt" - "os" - "path/filepath" "sync" - "time" - "github.com/observiq/stanza/errors" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/pipeline" - "go.etcd.io/bbolt" "go.uber.org/zap" ) // LogAgent is an entity that handles log monitoring. type LogAgent struct { database operator.Database - pipeline *pipeline.Pipeline + pipeline pipeline.Pipeline startOnce sync.Once stopOnce sync.Once @@ -25,109 +19,29 @@ type LogAgent struct { *zap.SugaredLogger } -// Start will start the log monitoring process. +// Start will start the log monitoring process func (a *LogAgent) Start() (err error) { a.startOnce.Do(func() { err = a.pipeline.Start() if err != nil { return } - a.Info("Agent started") }) return } -// Stop will stop the log monitoring process. -func (a *LogAgent) Stop() { +// Stop will stop the log monitoring process +func (a *LogAgent) Stop() (err error) { a.stopOnce.Do(func() { - a.pipeline.Stop() - a.database.Close() - a.Info("Agent stopped") - }) -} - -// OpenDatabase will open and create a database. -func OpenDatabase(file string) (operator.Database, error) { - if file == "" { - return operator.NewStubDatabase(), nil - } - - if _, err := os.Stat(filepath.Dir(file)); err != nil { - if os.IsNotExist(err) { - err := os.MkdirAll(filepath.Dir(file), 0755) - if err != nil { - return nil, fmt.Errorf("creating database directory: %s", err) - } - } else { - return nil, err + err = a.pipeline.Stop() + if err != nil { + return } - } - - options := &bbolt.Options{Timeout: 1 * time.Second} - return bbolt.Open(file, 0666, options) -} - -// LogAgentBuilder is a construct used to build a log agent -type LogAgentBuilder struct { - cfg *Config - logger *zap.SugaredLogger - pluginDir string - databaseFile string - defaultOutput operator.Operator -} - -// NewBuilder creates a new LogAgentBuilder -func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder { - return &LogAgentBuilder{ - cfg: cfg, - logger: logger, - } -} - -// WithPluginDir adds the specified plugin directory when building a log agent -func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder { - b.pluginDir = pluginDir - return b -} -// WithDatabaseFile adds the specified database file when building a log agent -func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder { - b.databaseFile = databaseFile - return b -} - -// WithDefaultOutput adds a default output when building a log agent -func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder { - b.defaultOutput = defaultOutput - return b -} - -// Build will build a new log agent using the values defined on the builder -func (b *LogAgentBuilder) Build() (*LogAgent, error) { - database, err := OpenDatabase(b.databaseFile) - if err != nil { - return nil, errors.Wrap(err, "open database") - } - - registry, err := operator.NewPluginRegistry(b.pluginDir) - if err != nil { - return nil, errors.Wrap(err, "load plugin registry") - } - - buildContext := operator.BuildContext{ - Logger: b.logger, - PluginRegistry: registry, - Database: database, - } - - pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput) - if err != nil { - return nil, err - } - - return &LogAgent{ - pipeline: pipeline, - database: database, - SugaredLogger: b.logger, - }, nil + err = a.database.Close() + if err != nil { + return + } + }) + return } diff --git a/agent/agent_test.go b/agent/agent_test.go index 925b764c8..c8e536462 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1,9 +1,7 @@ package agent import ( - "os" - "path/filepath" - "runtime" + "fmt" "testing" "github.com/observiq/stanza/testutil" @@ -11,44 +9,87 @@ import ( "go.uber.org/zap" ) -func TestNewLogAgent(t *testing.T) { - mockCfg := Config{} - mockLogger := zap.NewNop().Sugar() - mockPluginDir := "/some/path/plugins" - mockDatabaseFile := "" - agent, err := NewBuilder(&mockCfg, mockLogger). - WithPluginDir(mockPluginDir). - WithDatabaseFile(mockDatabaseFile). - Build() +func TestStartAgentSuccess(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + pipeline.On("Start").Return(nil) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + } + err := agent.Start() require.NoError(t, err) + pipeline.AssertCalled(t, "Start") +} + +func TestStartAgentFailure(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + failure := fmt.Errorf("failed to start pipeline") + pipeline.On("Start").Return(failure) - require.Equal(t, mockLogger, agent.SugaredLogger) + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + } + err := agent.Start() + require.Error(t, err, failure) + pipeline.AssertCalled(t, "Start") } -func TestOpenDatabase(t *testing.T) { - t.Run("Simple", func(t *testing.T) { - tempDir := testutil.NewTempDir(t) - db, err := OpenDatabase(filepath.Join(tempDir, "test.db")) - require.NoError(t, err) - require.NotNil(t, db) - }) - - t.Run("NonexistantPathIsCreated", func(t *testing.T) { - tempDir := testutil.NewTempDir(t) - db, err := OpenDatabase(filepath.Join(tempDir, "nonexistdir", "test.db")) - require.NoError(t, err) - require.NotNil(t, db) - }) - - t.Run("BadPermissions", func(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Windows does not have the same kind of file permissions") - } - tempDir := testutil.NewTempDir(t) - err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0666) - require.NoError(t, err) - db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) - require.Error(t, err) - require.Nil(t, db) - }) +func TestStopAgentSuccess(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + pipeline.On("Stop").Return(nil) + database := &testutil.Database{} + database.On("Close").Return(nil) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + database: database, + } + err := agent.Stop() + require.NoError(t, err) + pipeline.AssertCalled(t, "Stop") + database.AssertCalled(t, "Close") +} + +func TestStopAgentPipelineFailure(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + failure := fmt.Errorf("failed to start pipeline") + pipeline.On("Stop").Return(failure) + database := &testutil.Database{} + database.On("Close").Return(nil) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + database: database, + } + err := agent.Stop() + require.Error(t, err, failure) + pipeline.AssertCalled(t, "Stop") + database.AssertNotCalled(t, "Close") +} + +func TestStopAgentDatabaseFailure(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + pipeline.On("Stop").Return(nil) + database := &testutil.Database{} + failure := fmt.Errorf("failed to close database") + database.On("Close").Return(failure) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + database: database, + } + err := agent.Stop() + require.Error(t, err, failure) + pipeline.AssertCalled(t, "Stop") + database.AssertCalled(t, "Close") } diff --git a/agent/builder.go b/agent/builder.go new file mode 100644 index 000000000..8938f9dac --- /dev/null +++ b/agent/builder.go @@ -0,0 +1,72 @@ +package agent + +import ( + "github.com/observiq/stanza/errors" + "github.com/observiq/stanza/operator" + "go.uber.org/zap" +) + +// LogAgentBuilder is a construct used to build a log agent +type LogAgentBuilder struct { + cfg *Config + logger *zap.SugaredLogger + pluginDir string + databaseFile string + defaultOutput operator.Operator +} + +// NewBuilder creates a new LogAgentBuilder +func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder { + return &LogAgentBuilder{ + cfg: cfg, + logger: logger, + } +} + +// WithPluginDir adds the specified plugin directory when building a log agent +func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder { + b.pluginDir = pluginDir + return b +} + +// WithDatabaseFile adds the specified database file when building a log agent +func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder { + b.databaseFile = databaseFile + return b +} + +// WithDefaultOutput adds a default output when building a log agent +func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder { + b.defaultOutput = defaultOutput + return b +} + +// Build will build a new log agent using the values defined on the builder +func (b *LogAgentBuilder) Build() (*LogAgent, error) { + database, err := operator.OpenDatabase(b.databaseFile) + if err != nil { + return nil, errors.Wrap(err, "open database") + } + + registry, err := operator.NewPluginRegistry(b.pluginDir) + if err != nil { + return nil, errors.Wrap(err, "load plugin registry") + } + + buildContext := operator.BuildContext{ + Logger: b.logger, + PluginRegistry: registry, + Database: database, + } + + pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput) + if err != nil { + return nil, err + } + + return &LogAgent{ + pipeline: pipeline, + database: database, + SugaredLogger: b.logger, + }, nil +} diff --git a/agent/builder_test.go b/agent/builder_test.go new file mode 100644 index 000000000..352844918 --- /dev/null +++ b/agent/builder_test.go @@ -0,0 +1,85 @@ +package agent + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "github.com/observiq/stanza/pipeline" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestBuildAgentSuccess(t *testing.T) { + mockCfg := Config{} + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "/some/path/plugins" + mockDatabaseFile := "" + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.NoError(t, err) + require.Equal(t, mockLogger, agent.SugaredLogger) +} + +func TestBuildAgentFailureOnDatabase(t *testing.T) { + tempDir := testutil.NewTempDir(t) + invalidDatabaseFile := filepath.Join(tempDir, "test.db") + err := ioutil.WriteFile(invalidDatabaseFile, []byte("invalid"), 0755) + require.NoError(t, err) + + mockCfg := Config{} + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "/some/path/plugins" + mockDatabaseFile := invalidDatabaseFile + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.Error(t, err) + require.Nil(t, agent) +} + +func TestBuildAgentFailureOnPluginRegistry(t *testing.T) { + mockCfg := Config{} + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "[]" + mockDatabaseFile := "" + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.Error(t, err) + require.Nil(t, agent) +} + +func TestBuildAgentFailureOnPipeline(t *testing.T) { + mockCfg := Config{ + Pipeline: pipeline.Config{ + pipeline.Params{"type": "missing"}, + }, + } + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "/some/path/plugins" + mockDatabaseFile := "" + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.Error(t, err) + require.Nil(t, agent) +} diff --git a/agent/config_test.go b/agent/config_test.go new file mode 100644 index 000000000..77fdbf094 --- /dev/null +++ b/agent/config_test.go @@ -0,0 +1,114 @@ +package agent + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "github.com/observiq/stanza/pipeline" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" +) + +func TestNewConfigFromFile(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + - type: operator +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + config, err := NewConfigFromFile(configFile) + require.NoError(t, err) + require.Equal(t, len(config.Pipeline), 1) +} + +func TestNewConfigWithMissingFile(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + + _, err := NewConfigFromFile(configFile) + require.Error(t, err) + require.Contains(t, err.Error(), "no such file or directory") +} + +func TestNewConfigWithInvalidYAML(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + invalid: structure +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + _, err = NewConfigFromFile(configFile) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to read config file as yaml") +} + +func TestNewConfigFromGlobs(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + - type: operator +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + globs := []string{filepath.Join(tempDir, "*.yaml")} + config, err := NewConfigFromGlobs(globs) + require.NoError(t, err) + require.Equal(t, len(config.Pipeline), 1) +} + +func TestNewConfigFromGlobsWithInvalidGlob(t *testing.T) { + globs := []string{"[]"} + _, err := NewConfigFromGlobs(globs) + require.Error(t, err) + require.Contains(t, err.Error(), "syntax error in pattern") +} + +func TestNewConfigFromGlobsWithNoMatches(t *testing.T) { + tempDir := testutil.NewTempDir(t) + globs := []string{filepath.Join(tempDir, "*.yaml")} + _, err := NewConfigFromGlobs(globs) + require.Error(t, err) + require.Contains(t, err.Error(), "No config files found") +} + +func TestNewConfigFromGlobsWithInvalidConfig(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + invalid: structure +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + globs := []string{filepath.Join(tempDir, "*.yaml")} + _, err = NewConfigFromGlobs(globs) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to read config file as yaml") +} + +func TestMergeConfigs(t *testing.T) { + config1 := Config{ + Pipeline: pipeline.Config{ + {"type": "first"}, + }, + } + + config2 := Config{ + Pipeline: pipeline.Config{ + {"type": "second"}, + }, + } + + config3 := mergeConfigs(&config1, &config2) + require.Equal(t, len(config3.Pipeline), 2) +} diff --git a/cmd/stanza/graph.go b/cmd/stanza/graph.go index 3c1beb55e..fbae1cf19 100644 --- a/cmd/stanza/graph.go +++ b/cmd/stanza/graph.go @@ -59,7 +59,7 @@ func runGraph(_ *cobra.Command, _ []string, flags *RootFlags) { os.Exit(1) } - dotGraph, err := pipeline.MarshalDot() + dotGraph, err := pipeline.Render() if err != nil { logger.Errorw("Failed to marshal dot graph", zap.Any("error", err)) os.Exit(1) diff --git a/cmd/stanza/offsets.go b/cmd/stanza/offsets.go index a013d2c0c..1dc6fcec1 100644 --- a/cmd/stanza/offsets.go +++ b/cmd/stanza/offsets.go @@ -5,7 +5,7 @@ import ( "io" "os" - agent "github.com/observiq/stanza/agent" + "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/helper" "github.com/spf13/cobra" "go.etcd.io/bbolt" @@ -39,7 +39,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command { Short: "Clear persisted offsets from the database", Args: cobra.ArbitraryArgs, Run: func(command *cobra.Command, args []string) { - db, err := agent.OpenDatabase(rootFlags.DatabaseFile) + db, err := operator.OpenDatabase(rootFlags.DatabaseFile) exitOnErr("Failed to open database", err) defer db.Close() defer db.Sync() @@ -90,7 +90,7 @@ func NewOffsetsListCmd(rootFlags *RootFlags) *cobra.Command { Short: "List operators with persisted offsets", Args: cobra.NoArgs, Run: func(command *cobra.Command, args []string) { - db, err := agent.OpenDatabase(rootFlags.DatabaseFile) + db, err := operator.OpenDatabase(rootFlags.DatabaseFile) exitOnErr("Failed to open database", err) defer db.Close() diff --git a/cmd/stanza/service.go b/cmd/stanza/service.go index 51161fefa..2d4422308 100644 --- a/cmd/stanza/service.go +++ b/cmd/stanza/service.go @@ -23,14 +23,23 @@ func (a *AgentService) Start(s service.Service) error { if err := a.agent.Start(); err != nil { a.agent.Errorw("Failed to start stanza agent", zap.Any("error", err)) a.cancel() + return nil } + + a.agent.Info("Stanza agent started") return nil } // Stop will stop the stanza agent. func (a *AgentService) Stop(s service.Service) error { a.agent.Info("Stopping stanza agent") - a.agent.Stop() + if err := a.agent.Stop(); err != nil { + a.agent.Errorw("Failed to stop stanza agent gracefully", zap.Any("error", err)) + a.cancel() + return nil + } + + a.agent.Info("Stanza agent stopped") a.cancel() return nil } diff --git a/entry/entry_test.go b/entry/entry_test.go index de0f1e1a8..c7b9048ac 100644 --- a/entry/entry_test.go +++ b/entry/entry_test.go @@ -191,3 +191,44 @@ func TestFieldFromString(t *testing.T) { }) } } + +func TestAddLabel(t *testing.T) { + entry := Entry{} + entry.AddLabel("label", "value") + expected := map[string]string{"label": "value"} + require.Equal(t, expected, entry.Labels) +} + +func TestAddResourceKey(t *testing.T) { + entry := Entry{} + entry.AddResourceKey("key", "value") + expected := map[string]string{"key": "value"} + require.Equal(t, expected, entry.Resource) +} + +func TestReadToInterfaceMapWithMissingField(t *testing.T) { + entry := Entry{} + field := NewLabelField("label") + dest := map[string]interface{}{} + err := entry.readToInterfaceMap(field, &dest) + require.Error(t, err) + require.Contains(t, err.Error(), "can not be read as a map[string]interface{}") +} + +func TestReadToStringMapWithMissingField(t *testing.T) { + entry := Entry{} + field := NewLabelField("label") + dest := map[string]string{} + err := entry.readToStringMap(field, &dest) + require.Error(t, err) + require.Contains(t, err.Error(), "can not be read as a map[string]string") +} + +func TestReadToInterfaceMissingField(t *testing.T) { + entry := Entry{} + field := NewLabelField("label") + var dest interface{} + err := entry.readToInterface(field, &dest) + require.Error(t, err) + require.Contains(t, err.Error(), "can not be read as a interface{}") +} diff --git a/entry/field_test.go b/entry/field_test.go index 44acf1b55..a280cd1d9 100644 --- a/entry/field_test.go +++ b/entry/field_test.go @@ -42,6 +42,14 @@ func TestFieldUnmarshalJSON(t *testing.T) { } } +func TestFieldUnmarshalJSONFailure(t *testing.T) { + invalidField := []byte(`{"key":"value"}`) + var f Field + err := json.Unmarshal(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "cannot unmarshal object into Go value of type string") +} + func TestFieldMarshalJSON(t *testing.T) { cases := []struct { name string @@ -114,6 +122,14 @@ func TestFieldUnmarshalYAML(t *testing.T) { } } +func TestFieldUnmarshalYAMLFailure(t *testing.T) { + invalidField := []byte(`invalid: field`) + var f Field + err := yaml.UnmarshalStrict(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "cannot unmarshal !!map into string") +} + func TestFieldMarshalYAML(t *testing.T) { cases := []struct { name string @@ -207,6 +223,9 @@ func TestSplitField(t *testing.T) { {"BracketAtEnd", `$record[`, nil, true}, {"SingleQuoteAtEnd", `$record['`, nil, true}, {"DoubleQuoteAtEnd", `$record["`, nil, true}, + {"BracketMissingQuotes", `$record[test]`, nil, true}, + {"CharacterBetweenBracketAndQuote", `$record["test"a]`, nil, true}, + {"CharacterOutsideBracket", `$record["test"]a`, nil, true}, } for _, tc := range cases { @@ -222,3 +241,21 @@ func TestSplitField(t *testing.T) { }) } } + +func TestFieldFromStringInvalidSplit(t *testing.T) { + _, err := fieldFromString("$resource[test]") + require.Error(t, err) + require.Contains(t, err.Error(), "splitting field") +} + +func TestFieldFromStringWithResource(t *testing.T) { + field, err := fieldFromString(`$resource["test"]`) + require.NoError(t, err) + require.Equal(t, "$resource.test", field.String()) +} + +func TestFieldFromStringWithInvalidResource(t *testing.T) { + _, err := fieldFromString(`$resource["test"]["key"]`) + require.Error(t, err) + require.Contains(t, err.Error(), "resource fields cannot be nested") +} diff --git a/entry/nil_field_test.go b/entry/nil_field_test.go new file mode 100644 index 000000000..089f5be24 --- /dev/null +++ b/entry/nil_field_test.go @@ -0,0 +1,37 @@ +package entry + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNilFieldGet(t *testing.T) { + entry := &Entry{} + nilField := NewNilField() + value, ok := nilField.Get(entry) + require.True(t, ok) + require.Nil(t, value) +} + +func TestNilFieldSet(t *testing.T) { + entry := &Entry{} + nilField := NewNilField() + err := nilField.Set(entry, "value") + require.NoError(t, err) + require.Equal(t, *entry, Entry{}) +} + +func TestNilFieldDelete(t *testing.T) { + entry := &Entry{} + nilField := NewNilField() + value, ok := nilField.Delete(entry) + require.True(t, ok) + require.Nil(t, value) + require.Equal(t, *entry, Entry{}) +} + +func TestNilFieldString(t *testing.T) { + nilField := NewNilField() + require.Equal(t, "$nil", nilField.String()) +} diff --git a/entry/record_field.go b/entry/record_field.go index 9834e1b8b..10aaedaae 100644 --- a/entry/record_field.go +++ b/entry/record_field.go @@ -84,7 +84,7 @@ func (f RecordField) Set(entry *Entry, value interface{}) error { for i, key := range f.Keys { if i == len(f.Keys)-1 { currentMap[key] = value - return nil + break } currentMap = f.getNestedMap(currentMap, key) } @@ -122,12 +122,12 @@ func (f RecordField) Delete(entry *Entry) (interface{}, bool) { for i, key := range f.Keys { currentMap, ok := currentValue.(map[string]interface{}) if !ok { - return nil, false + break } currentValue, ok = currentMap[key] if !ok { - return nil, false + break } if i == len(f.Keys)-1 { diff --git a/entry/record_field_test.go b/entry/record_field_test.go index 45b31ef7d..071e84681 100644 --- a/entry/record_field_test.go +++ b/entry/record_field_test.go @@ -1,10 +1,12 @@ package entry import ( + "encoding/json" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + yaml "gopkg.in/yaml.v2" ) func testRecord() map[string]interface{} { @@ -159,6 +161,14 @@ func TestRecordFieldDelete(t *testing.T) { nestedMap(), true, }, + { + "InvalidNestedKey", + NewRecordField("simple_key", "missing"), + testRecord(), + testRecord(), + nil, + false, + }, } for _, tc := range cases { @@ -194,6 +204,13 @@ func TestRecordFieldSet(t *testing.T) { "new_value", "new_value", }, + { + "OverwriteRawWithMap", + NewRecordField("embedded", "field"), + "raw_value", + "new_value", + map[string]interface{}{"embedded": map[string]interface{}{"field": "new_value"}}, + }, { "NewMapValue", NewRecordField(), @@ -268,7 +285,70 @@ func TestRecordFieldParent(t *testing.T) { }) } -func TestFieldChild(t *testing.T) { +func TestRecordFieldChild(t *testing.T) { field := RecordField{[]string{"parent"}} require.Equal(t, RecordField{[]string{"parent", "child"}}, field.Child("child")) } + +func TestRecordFieldMerge(t *testing.T) { + entry := &Entry{} + entry.Record = "raw_value" + field := RecordField{[]string{"embedded"}} + values := map[string]interface{}{"new": "values"} + field.Merge(entry, values) + expected := map[string]interface{}{"embedded": values} + require.Equal(t, expected, entry.Record) +} + +func TestRecordFieldMarshalJSON(t *testing.T) { + recordField := RecordField{Keys: []string{"test"}} + json, err := recordField.MarshalJSON() + require.NoError(t, err) + require.Equal(t, []byte(`"test"`), json) +} + +func TestRecordFieldUnmarshalJSON(t *testing.T) { + fieldString := []byte(`"test"`) + var f RecordField + err := json.Unmarshal(fieldString, &f) + require.NoError(t, err) + require.Equal(t, RecordField{Keys: []string{"test"}}, f) +} + +func TestRecordFieldUnmarshalJSONFailure(t *testing.T) { + invalidField := []byte(`{"key":"value"}`) + var f RecordField + err := json.Unmarshal(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "the field is not a string: json") +} + +func TestRecordFieldMarshalYAML(t *testing.T) { + recordField := RecordField{Keys: []string{"test"}} + yaml, err := recordField.MarshalYAML() + require.NoError(t, err) + require.Equal(t, "test", yaml) +} + +func TestRecordFieldUnmarshalYAML(t *testing.T) { + invalidField := []byte("test") + var f RecordField + err := yaml.UnmarshalStrict(invalidField, &f) + require.NoError(t, err) + require.Equal(t, RecordField{Keys: []string{"test"}}, f) +} + +func TestRecordFieldUnmarshalYAMLFailure(t *testing.T) { + invalidField := []byte(`{"key":"value"}`) + var f RecordField + err := yaml.UnmarshalStrict(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "the field is not a string: yaml") +} + +func TestRecordFieldFromJSONDot(t *testing.T) { + jsonDot := "$.test" + recordField := fromJSONDot(jsonDot) + expectedField := RecordField{Keys: []string{"test"}} + require.Equal(t, expectedField, recordField) +} diff --git a/operator/config.go b/operator/config.go index 6236aa7eb..91f8b7e33 100644 --- a/operator/config.go +++ b/operator/config.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" - "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -29,36 +28,6 @@ type BuildContext struct { Logger *zap.SugaredLogger } -// Database is a database used to save offsets -type Database interface { - Close() error - Sync() error - Update(func(*bbolt.Tx) error) error - View(func(*bbolt.Tx) error) error -} - -// StubDatabase is an implementation of Database that -// succeeds on all calls without persisting anything to disk. -// This is used when --database is unspecified. -type StubDatabase struct{} - -// Close will be ignored by the stub database -func (d *StubDatabase) Close() error { return nil } - -// Sync will be ignored by the stub database -func (d *StubDatabase) Sync() error { return nil } - -// Update will be ignored by the stub database -func (d *StubDatabase) Update(func(tx *bbolt.Tx) error) error { return nil } - -// View will be ignored by the stub database -func (d *StubDatabase) View(func(tx *bbolt.Tx) error) error { return nil } - -// NewStubDatabase creates a new StubDatabase -func NewStubDatabase() *StubDatabase { - return &StubDatabase{} -} - // registry is a global registry of operator types to operator builders. var registry = make(map[string]func() Builder) diff --git a/operator/database.go b/operator/database.go new file mode 100644 index 000000000..b0765409c --- /dev/null +++ b/operator/database.go @@ -0,0 +1,63 @@ +//go:generate mockery -name=^(Database)$ -output=../testutil -outpkg=testutil -case=snake + +package operator + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "go.etcd.io/bbolt" +) + +// Database is a database used to save offsets +type Database interface { + Close() error + Sync() error + Update(func(*bbolt.Tx) error) error + View(func(*bbolt.Tx) error) error +} + +// StubDatabase is an implementation of Database that +// succeeds on all calls without persisting anything to disk. +// This is used when --database is unspecified. +type StubDatabase struct{} + +// Close will be ignored by the stub database +func (d *StubDatabase) Close() error { return nil } + +// Sync will be ignored by the stub database +func (d *StubDatabase) Sync() error { return nil } + +// Update will be ignored by the stub database +func (d *StubDatabase) Update(func(tx *bbolt.Tx) error) error { return nil } + +// View will be ignored by the stub database +func (d *StubDatabase) View(func(tx *bbolt.Tx) error) error { return nil } + +// NewStubDatabase creates a new StubDatabase +func NewStubDatabase() *StubDatabase { + return &StubDatabase{} +} + +// OpenDatabase will open and create a database +func OpenDatabase(file string) (Database, error) { + if file == "" { + return NewStubDatabase(), nil + } + + if _, err := os.Stat(filepath.Dir(file)); err != nil { + if os.IsNotExist(err) { + err := os.MkdirAll(filepath.Dir(file), 0755) + if err != nil { + return nil, fmt.Errorf("creating database directory: %s", err) + } + } else { + return nil, err + } + } + + options := &bbolt.Options{Timeout: 1 * time.Second} + return bbolt.Open(file, 0666, options) +} diff --git a/operator/database_test.go b/operator/database_test.go new file mode 100644 index 000000000..2582d665a --- /dev/null +++ b/operator/database_test.go @@ -0,0 +1,38 @@ +package operator + +import ( + "os" + "path/filepath" + "runtime" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestOpenDatabase(t *testing.T) { + t.Run("Simple", func(t *testing.T) { + tempDir := NewTempDir(t) + db, err := OpenDatabase(filepath.Join(tempDir, "test.db")) + require.NoError(t, err) + require.NotNil(t, db) + }) + + t.Run("NonexistantPathIsCreated", func(t *testing.T) { + tempDir := NewTempDir(t) + db, err := OpenDatabase(filepath.Join(tempDir, "nonexistdir", "test.db")) + require.NoError(t, err) + require.NotNil(t, db) + }) + + t.Run("BadPermissions", func(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Windows does not have the same kind of file permissions") + } + tempDir := NewTempDir(t) + err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0666) + require.NoError(t, err) + db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) + require.Error(t, err) + require.Nil(t, db) + }) +} diff --git a/pipeline/config.go b/pipeline/config.go index f8b5faa49..b2e7c843e 100644 --- a/pipeline/config.go +++ b/pipeline/config.go @@ -14,7 +14,7 @@ import ( type Config []Params // BuildPipeline will build a pipeline from the config. -func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput operator.Operator) (*Pipeline, error) { +func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput operator.Operator) (*DirectedPipeline, error) { operatorConfigs, err := c.buildOperatorConfigs(context.PluginRegistry) if err != nil { return nil, err @@ -29,7 +29,7 @@ func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput opera operators = append(operators, defaultOutput) } - pipeline, err := NewPipeline(operators) + pipeline, err := NewDirectedPipeline(operators) if err != nil { return nil, err } diff --git a/pipeline/directed.go b/pipeline/directed.go new file mode 100644 index 000000000..c785da2a4 --- /dev/null +++ b/pipeline/directed.go @@ -0,0 +1,190 @@ +package pipeline + +import ( + "fmt" + "strings" + + "github.com/observiq/stanza/errors" + "github.com/observiq/stanza/operator" + "gonum.org/v1/gonum/graph/encoding/dot" + "gonum.org/v1/gonum/graph/simple" + "gonum.org/v1/gonum/graph/topo" +) + +// DirectedPipeline is a pipeline backed by a directed graph +type DirectedPipeline struct { + Graph *simple.DirectedGraph + running bool +} + +// Start will start the operators in a pipeline in reverse topological order +func (p *DirectedPipeline) Start() error { + if p.running { + return nil + } + + sortedNodes, _ := topo.Sort(p.Graph) + for i := len(sortedNodes) - 1; i >= 0; i-- { + operator := sortedNodes[i].(OperatorNode).Operator() + operator.Logger().Debug("Starting operator") + if err := operator.Start(); err != nil { + return err + } + operator.Logger().Debug("Started operator") + } + + p.running = true + return nil +} + +// Stop will stop the operators in a pipeline in topological order +func (p *DirectedPipeline) Stop() error { + if !p.running { + return nil + } + + sortedNodes, _ := topo.Sort(p.Graph) + for _, node := range sortedNodes { + operator := node.(OperatorNode).Operator() + operator.Logger().Debug("Stopping operator") + _ = operator.Stop() + operator.Logger().Debug("Stopped operator") + } + + p.running = false + return nil +} + +// Running will return the running state of the pipeline +func (p *DirectedPipeline) Running() bool { + return p.running +} + +// Render will render the pipeline as a dot graph +func (p *DirectedPipeline) Render() ([]byte, error) { + return dot.Marshal(p.Graph, "G", "", " ") +} + +// addNodes will add operators as nodes to the supplied graph. +func addNodes(graph *simple.DirectedGraph, operators []operator.Operator) error { + for _, operator := range operators { + operatorNode := createOperatorNode(operator) + if graph.Node(operatorNode.ID()) != nil { + return errors.NewError( + fmt.Sprintf("operator with id '%s' already exists in pipeline", operatorNode.Operator().ID()), + "ensure that each operator has a unique `type` or `id`", + ) + } + + graph.AddNode(operatorNode) + } + return nil +} + +// connectNodes will connect the nodes in the supplied graph. +func connectNodes(graph *simple.DirectedGraph) error { + nodes := graph.Nodes() + for nodes.Next() { + node := nodes.Node().(OperatorNode) + if err := connectNode(graph, node); err != nil { + return err + } + } + + if _, err := topo.Sort(graph); err != nil { + return errors.NewError( + "pipeline has a circular dependency", + "ensure that all operators are connected in a straight, acyclic line", + "cycles", unorderableToCycles(err.(topo.Unorderable)), + ) + } + + return nil +} + +// connectNode will connect a node to its outputs in the supplied graph. +func connectNode(graph *simple.DirectedGraph, inputNode OperatorNode) error { + for outputOperatorID, outputNodeID := range inputNode.OutputIDs() { + if graph.Node(outputNodeID) == nil { + return errors.NewError( + "operators cannot be connected, because the output does not exist in the pipeline", + "ensure that the output operator is defined", + "input_operator", inputNode.Operator().ID(), + "output_operator", outputOperatorID, + ) + } + + outputNode := graph.Node(outputNodeID).(OperatorNode) + if !outputNode.Operator().CanProcess() { + return errors.NewError( + "operators cannot be connected, because the output operator can not process logs", + "ensure that the output operator can process logs (like a parser or destination)", + "input_operator", inputNode.Operator().ID(), + "output_operator", outputOperatorID, + ) + } + + if graph.HasEdgeFromTo(inputNode.ID(), outputNodeID) { + return errors.NewError( + "operators cannot be connected, because a connection already exists", + "ensure that only a single connection exists between the two operators", + "input_operator", inputNode.Operator().ID(), + "output_operator", outputOperatorID, + ) + } + + edge := graph.NewEdge(inputNode, outputNode) + graph.SetEdge(edge) + } + + return nil +} + +// setOperatorOutputs will set the outputs on operators that can output. +func setOperatorOutputs(operators []operator.Operator) error { + for _, operator := range operators { + if !operator.CanOutput() { + continue + } + + if err := operator.SetOutputs(operators); err != nil { + return errors.WithDetails(err, "operator_id", operator.ID()) + } + } + return nil +} + +// NewDirectedPipeline creates a new directed pipeline +func NewDirectedPipeline(operators []operator.Operator) (*DirectedPipeline, error) { + if err := setOperatorOutputs(operators); err != nil { + return nil, err + } + + graph := simple.NewDirectedGraph() + if err := addNodes(graph, operators); err != nil { + return nil, err + } + + if err := connectNodes(graph); err != nil { + return nil, err + } + + return &DirectedPipeline{Graph: graph}, nil +} + +func unorderableToCycles(err topo.Unorderable) string { + var cycles strings.Builder + for i, cycle := range err { + if i != 0 { + cycles.WriteByte(',') + } + cycles.WriteByte('(') + for _, node := range cycle { + cycles.WriteString(node.(OperatorNode).operator.ID()) + cycles.Write([]byte(` -> `)) + } + cycles.WriteString(cycle[0].(OperatorNode).operator.ID()) + cycles.WriteByte(')') + } + return cycles.String() +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index c09fe6f21..0e01a89e0 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -1,184 +1,11 @@ -package pipeline - -import ( - "fmt" - "strings" - - "github.com/observiq/stanza/errors" - "github.com/observiq/stanza/operator" - "gonum.org/v1/gonum/graph/encoding/dot" - "gonum.org/v1/gonum/graph/simple" - "gonum.org/v1/gonum/graph/topo" -) - -// Pipeline is a directed graph of connected operators. -type Pipeline struct { - Graph *simple.DirectedGraph - running bool -} - -// Start will start the operators in a pipeline in reverse topological order. -func (p *Pipeline) Start() error { - if p.running { - return nil - } - - sortedNodes, _ := topo.Sort(p.Graph) - for i := len(sortedNodes) - 1; i >= 0; i-- { - operator := sortedNodes[i].(OperatorNode).Operator() - operator.Logger().Debug("Starting operator") - if err := operator.Start(); err != nil { - return err - } - operator.Logger().Debug("Started operator") - } - - p.running = true - return nil -} - -// Stop will stop the operators in a pipeline in topological order. -func (p *Pipeline) Stop() { - if !p.running { - return - } - - sortedNodes, _ := topo.Sort(p.Graph) - for _, node := range sortedNodes { - operator := node.(OperatorNode).Operator() - operator.Logger().Debug("Stopping operator") - _ = operator.Stop() - operator.Logger().Debug("Stopped operator") - } - - p.running = false -} - -// MarshalDot will encode the pipeline as a dot graph. -func (p *Pipeline) MarshalDot() ([]byte, error) { - return dot.Marshal(p.Graph, "G", "", " ") -} - -// addNodes will add operators as nodes to the supplied graph. -func addNodes(graph *simple.DirectedGraph, operators []operator.Operator) error { - for _, operator := range operators { - operatorNode := createOperatorNode(operator) - if graph.Node(operatorNode.ID()) != nil { - return errors.NewError( - fmt.Sprintf("operator with id '%s' already exists in pipeline", operatorNode.Operator().ID()), - "ensure that each operator has a unique `type` or `id`", - ) - } - - graph.AddNode(operatorNode) - } - return nil -} +//go:generate mockery -name=^(Pipeline)$ -output=../testutil -outpkg=testutil -case=snake -// connectNodes will connect the nodes in the supplied graph. -func connectNodes(graph *simple.DirectedGraph) error { - nodes := graph.Nodes() - for nodes.Next() { - node := nodes.Node().(OperatorNode) - if err := connectNode(graph, node); err != nil { - return err - } - } - - if _, err := topo.Sort(graph); err != nil { - return errors.NewError( - "pipeline has a circular dependency", - "ensure that all operators are connected in a straight, acyclic line", - "cycles", unorderableToCycles(err.(topo.Unorderable)), - ) - } - - return nil -} - -// connectNode will connect a node to its outputs in the supplied graph. -func connectNode(graph *simple.DirectedGraph, inputNode OperatorNode) error { - for outputOperatorID, outputNodeID := range inputNode.OutputIDs() { - if graph.Node(outputNodeID) == nil { - return errors.NewError( - "operators cannot be connected, because the output does not exist in the pipeline", - "ensure that the output operator is defined", - "input_operator", inputNode.Operator().ID(), - "output_operator", outputOperatorID, - ) - } - - outputNode := graph.Node(outputNodeID).(OperatorNode) - if !outputNode.Operator().CanProcess() { - return errors.NewError( - "operators cannot be connected, because the output operator can not process logs", - "ensure that the output operator can process logs (like a parser or destination)", - "input_operator", inputNode.Operator().ID(), - "output_operator", outputOperatorID, - ) - } - - if graph.HasEdgeFromTo(inputNode.ID(), outputNodeID) { - return errors.NewError( - "operators cannot be connected, because a connection already exists", - "ensure that only a single connection exists between the two operators", - "input_operator", inputNode.Operator().ID(), - "output_operator", outputOperatorID, - ) - } - - edge := graph.NewEdge(inputNode, outputNode) - graph.SetEdge(edge) - } - - return nil -} - -// setOperatorOutputs will set the outputs on operators that can output. -func setOperatorOutputs(operators []operator.Operator) error { - for _, operator := range operators { - if !operator.CanOutput() { - continue - } - - if err := operator.SetOutputs(operators); err != nil { - return errors.WithDetails(err, "operator_id", operator.ID()) - } - } - return nil -} - -// NewPipeline creates a new pipeline of connected operators. -func NewPipeline(operators []operator.Operator) (*Pipeline, error) { - if err := setOperatorOutputs(operators); err != nil { - return nil, err - } - - graph := simple.NewDirectedGraph() - if err := addNodes(graph, operators); err != nil { - return nil, err - } - - if err := connectNodes(graph); err != nil { - return nil, err - } - - return &Pipeline{Graph: graph}, nil -} +package pipeline -func unorderableToCycles(err topo.Unorderable) string { - var cycles strings.Builder - for i, cycle := range err { - if i != 0 { - cycles.WriteByte(',') - } - cycles.WriteByte('(') - for _, node := range cycle { - cycles.WriteString(node.(OperatorNode).operator.ID()) - cycles.Write([]byte(` -> `)) - } - cycles.WriteString(cycle[0].(OperatorNode).operator.ID()) - cycles.WriteByte(')') - } - return cycles.String() +// Pipeline is a collection of connected operators that exchange entries +type Pipeline interface { + Start() error + Stop() error + Render() ([]byte, error) + Running() bool } diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 2cb68a116..7629034c7 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -67,7 +67,7 @@ func TestUnorderableToCycles(t *testing.T) { func TestPipeline(t *testing.T) { t.Run("MultipleStart", func(t *testing.T) { - pipeline, err := NewPipeline([]operator.Operator{}) + pipeline, err := NewDirectedPipeline([]operator.Operator{}) require.NoError(t, err) err = pipeline.Start() @@ -80,7 +80,7 @@ func TestPipeline(t *testing.T) { }) t.Run("MultipleStop", func(t *testing.T) { - pipeline, err := NewPipeline([]operator.Operator{}) + pipeline, err := NewDirectedPipeline([]operator.Operator{}) require.NoError(t, err) err = pipeline.Start() @@ -98,7 +98,7 @@ func TestPipeline(t *testing.T) { operator2.On("SetOutputs", mock.Anything).Return(nil) operator2.On("Outputs").Return(nil) - _, err := NewPipeline([]operator.Operator{operator1, operator2}) + _, err := NewDirectedPipeline([]operator.Operator{operator1, operator2}) require.Error(t, err) require.Contains(t, err.Error(), "already exists") }) @@ -112,7 +112,7 @@ func TestPipeline(t *testing.T) { operator2.On("SetOutputs", mock.Anything).Return(nil) operator2.On("Outputs").Return([]operator.Operator{operator1}) - _, err := NewPipeline([]operator.Operator{operator2}) + _, err := NewDirectedPipeline([]operator.Operator{operator2}) require.Error(t, err) require.Contains(t, err.Error(), "does not exist") }) @@ -129,7 +129,7 @@ func TestPipeline(t *testing.T) { operator2.On("SetOutputs", mock.Anything).Return(nil) operator2.On("Outputs").Return([]operator.Operator{operator1}) - _, err := NewPipeline([]operator.Operator{operator1, operator2}) + _, err := NewDirectedPipeline([]operator.Operator{operator1, operator2}) require.Error(t, err) require.Contains(t, err.Error(), "can not process") }) @@ -168,7 +168,7 @@ func TestPipeline(t *testing.T) { mockOperator3.On("Outputs").Return([]operator.Operator{mockOperator1}) mockOperator3.On("SetOutputs", mock.Anything).Return(nil) - _, err := NewPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) + _, err := NewDirectedPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) require.Error(t, err) require.Contains(t, err.Error(), "circular dependency") }) diff --git a/testutil/database.go b/testutil/database.go new file mode 100644 index 000000000..f2ab136a0 --- /dev/null +++ b/testutil/database.go @@ -0,0 +1,69 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package testutil + +import ( + mock "github.com/stretchr/testify/mock" + bbolt "go.etcd.io/bbolt" +) + +// Database is an autogenerated mock type for the Database type +type Database struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *Database) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Sync provides a mock function with given fields: +func (_m *Database) Sync() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Update provides a mock function with given fields: _a0 +func (_m *Database) Update(_a0 func(*bbolt.Tx) error) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(func(*bbolt.Tx) error) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// View provides a mock function with given fields: _a0 +func (_m *Database) View(_a0 func(*bbolt.Tx) error) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(func(*bbolt.Tx) error) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/testutil/pipeline.go b/testutil/pipeline.go new file mode 100644 index 000000000..2152bd92f --- /dev/null +++ b/testutil/pipeline.go @@ -0,0 +1,75 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package testutil + +import mock "github.com/stretchr/testify/mock" + +// Pipeline is an autogenerated mock type for the Pipeline type +type Pipeline struct { + mock.Mock +} + +// Render provides a mock function with given fields: +func (_m *Pipeline) Render() ([]byte, error) { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Running provides a mock function with given fields: +func (_m *Pipeline) Running() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Start provides a mock function with given fields: +func (_m *Pipeline) Start() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Stop provides a mock function with given fields: +func (_m *Pipeline) Stop() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +}