diff --git a/agent/agent.go b/agent/agent.go index 5f243c62b..f3d6e253d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1,23 +1,17 @@ package agent import ( - "fmt" - "os" - "path/filepath" "sync" - "time" - "github.com/observiq/stanza/errors" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/database" "github.com/observiq/stanza/pipeline" - "go.etcd.io/bbolt" "go.uber.org/zap" ) // LogAgent is an entity that handles log monitoring. type LogAgent struct { - database operator.Database - pipeline *pipeline.Pipeline + database database.Database + pipeline pipeline.Pipeline startOnce sync.Once stopOnce sync.Once @@ -25,109 +19,29 @@ type LogAgent struct { *zap.SugaredLogger } -// Start will start the log monitoring process. +// Start will start the log monitoring process func (a *LogAgent) Start() (err error) { a.startOnce.Do(func() { err = a.pipeline.Start() if err != nil { return } - a.Info("Agent started") }) return } -// Stop will stop the log monitoring process. -func (a *LogAgent) Stop() { +// Stop will stop the log monitoring process +func (a *LogAgent) Stop() (err error) { a.stopOnce.Do(func() { - a.pipeline.Stop() - a.database.Close() - a.Info("Agent stopped") - }) -} - -// OpenDatabase will open and create a database. -func OpenDatabase(file string) (operator.Database, error) { - if file == "" { - return operator.NewStubDatabase(), nil - } - - if _, err := os.Stat(filepath.Dir(file)); err != nil { - if os.IsNotExist(err) { - err := os.MkdirAll(filepath.Dir(file), 0755) - if err != nil { - return nil, fmt.Errorf("creating database directory: %s", err) - } - } else { - return nil, err + err = a.pipeline.Stop() + if err != nil { + return } - } - - options := &bbolt.Options{Timeout: 1 * time.Second} - return bbolt.Open(file, 0666, options) -} - -// LogAgentBuilder is a construct used to build a log agent -type LogAgentBuilder struct { - cfg *Config - logger *zap.SugaredLogger - pluginDir string - databaseFile string - defaultOutput operator.Operator -} - -// NewBuilder creates a new LogAgentBuilder -func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder { - return &LogAgentBuilder{ - cfg: cfg, - logger: logger, - } -} - -// WithPluginDir adds the specified plugin directory when building a log agent -func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder { - b.pluginDir = pluginDir - return b -} -// WithDatabaseFile adds the specified database file when building a log agent -func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder { - b.databaseFile = databaseFile - return b -} - -// WithDefaultOutput adds a default output when building a log agent -func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder { - b.defaultOutput = defaultOutput - return b -} - -// Build will build a new log agent using the values defined on the builder -func (b *LogAgentBuilder) Build() (*LogAgent, error) { - database, err := OpenDatabase(b.databaseFile) - if err != nil { - return nil, errors.Wrap(err, "open database") - } - - registry, err := operator.NewPluginRegistry(b.pluginDir) - if err != nil { - return nil, errors.Wrap(err, "load plugin registry") - } - - buildContext := operator.BuildContext{ - Logger: b.logger, - PluginRegistry: registry, - Database: database, - } - - pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput) - if err != nil { - return nil, err - } - - return &LogAgent{ - pipeline: pipeline, - database: database, - SugaredLogger: b.logger, - }, nil + err = a.database.Close() + if err != nil { + return + } + }) + return } diff --git a/agent/agent_test.go b/agent/agent_test.go index 925b764c8..c8e536462 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1,9 +1,7 @@ package agent import ( - "os" - "path/filepath" - "runtime" + "fmt" "testing" "github.com/observiq/stanza/testutil" @@ -11,44 +9,87 @@ import ( "go.uber.org/zap" ) -func TestNewLogAgent(t *testing.T) { - mockCfg := Config{} - mockLogger := zap.NewNop().Sugar() - mockPluginDir := "/some/path/plugins" - mockDatabaseFile := "" - agent, err := NewBuilder(&mockCfg, mockLogger). - WithPluginDir(mockPluginDir). - WithDatabaseFile(mockDatabaseFile). - Build() +func TestStartAgentSuccess(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + pipeline.On("Start").Return(nil) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + } + err := agent.Start() require.NoError(t, err) + pipeline.AssertCalled(t, "Start") +} + +func TestStartAgentFailure(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + failure := fmt.Errorf("failed to start pipeline") + pipeline.On("Start").Return(failure) - require.Equal(t, mockLogger, agent.SugaredLogger) + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + } + err := agent.Start() + require.Error(t, err, failure) + pipeline.AssertCalled(t, "Start") } -func TestOpenDatabase(t *testing.T) { - t.Run("Simple", func(t *testing.T) { - tempDir := testutil.NewTempDir(t) - db, err := OpenDatabase(filepath.Join(tempDir, "test.db")) - require.NoError(t, err) - require.NotNil(t, db) - }) - - t.Run("NonexistantPathIsCreated", func(t *testing.T) { - tempDir := testutil.NewTempDir(t) - db, err := OpenDatabase(filepath.Join(tempDir, "nonexistdir", "test.db")) - require.NoError(t, err) - require.NotNil(t, db) - }) - - t.Run("BadPermissions", func(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Windows does not have the same kind of file permissions") - } - tempDir := testutil.NewTempDir(t) - err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0666) - require.NoError(t, err) - db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) - require.Error(t, err) - require.Nil(t, db) - }) +func TestStopAgentSuccess(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + pipeline.On("Stop").Return(nil) + database := &testutil.Database{} + database.On("Close").Return(nil) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + database: database, + } + err := agent.Stop() + require.NoError(t, err) + pipeline.AssertCalled(t, "Stop") + database.AssertCalled(t, "Close") +} + +func TestStopAgentPipelineFailure(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + failure := fmt.Errorf("failed to start pipeline") + pipeline.On("Stop").Return(failure) + database := &testutil.Database{} + database.On("Close").Return(nil) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + database: database, + } + err := agent.Stop() + require.Error(t, err, failure) + pipeline.AssertCalled(t, "Stop") + database.AssertNotCalled(t, "Close") +} + +func TestStopAgentDatabaseFailure(t *testing.T) { + logger := zap.NewNop().Sugar() + pipeline := &testutil.Pipeline{} + pipeline.On("Stop").Return(nil) + database := &testutil.Database{} + failure := fmt.Errorf("failed to close database") + database.On("Close").Return(failure) + + agent := LogAgent{ + SugaredLogger: logger, + pipeline: pipeline, + database: database, + } + err := agent.Stop() + require.Error(t, err, failure) + pipeline.AssertCalled(t, "Stop") + database.AssertCalled(t, "Close") } diff --git a/agent/builder.go b/agent/builder.go new file mode 100644 index 000000000..a3b3636fb --- /dev/null +++ b/agent/builder.go @@ -0,0 +1,73 @@ +package agent + +import ( + "github.com/observiq/stanza/database" + "github.com/observiq/stanza/errors" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/plugin" + "go.uber.org/zap" +) + +// LogAgentBuilder is a construct used to build a log agent +type LogAgentBuilder struct { + cfg *Config + logger *zap.SugaredLogger + pluginDir string + databaseFile string + defaultOutput operator.Operator +} + +// NewBuilder creates a new LogAgentBuilder +func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder { + return &LogAgentBuilder{ + cfg: cfg, + logger: logger, + } +} + +// WithPluginDir adds the specified plugin directory when building a log agent +func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder { + b.pluginDir = pluginDir + return b +} + +// WithDatabaseFile adds the specified database file when building a log agent +func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder { + b.databaseFile = databaseFile + return b +} + +// WithDefaultOutput adds a default output when building a log agent +func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder { + b.defaultOutput = defaultOutput + return b +} + +// Build will build a new log agent using the values defined on the builder +func (b *LogAgentBuilder) Build() (*LogAgent, error) { + db, err := database.OpenDatabase(b.databaseFile) + if err != nil { + return nil, errors.Wrap(err, "open database") + } + + registry, err := plugin.NewPluginRegistry(b.pluginDir) + if err != nil { + return nil, errors.Wrap(err, "load plugin registry") + } + + buildContext := operator.BuildContext{ + Logger: b.logger, + Database: db, + } + + pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, registry, b.defaultOutput) + if err != nil { + return nil, err + } + + return &LogAgent{ + pipeline: pipeline, + database: db, + SugaredLogger: b.logger, + }, nil +} diff --git a/agent/builder_test.go b/agent/builder_test.go new file mode 100644 index 000000000..352844918 --- /dev/null +++ b/agent/builder_test.go @@ -0,0 +1,85 @@ +package agent + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "github.com/observiq/stanza/pipeline" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestBuildAgentSuccess(t *testing.T) { + mockCfg := Config{} + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "/some/path/plugins" + mockDatabaseFile := "" + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.NoError(t, err) + require.Equal(t, mockLogger, agent.SugaredLogger) +} + +func TestBuildAgentFailureOnDatabase(t *testing.T) { + tempDir := testutil.NewTempDir(t) + invalidDatabaseFile := filepath.Join(tempDir, "test.db") + err := ioutil.WriteFile(invalidDatabaseFile, []byte("invalid"), 0755) + require.NoError(t, err) + + mockCfg := Config{} + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "/some/path/plugins" + mockDatabaseFile := invalidDatabaseFile + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.Error(t, err) + require.Nil(t, agent) +} + +func TestBuildAgentFailureOnPluginRegistry(t *testing.T) { + mockCfg := Config{} + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "[]" + mockDatabaseFile := "" + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.Error(t, err) + require.Nil(t, agent) +} + +func TestBuildAgentFailureOnPipeline(t *testing.T) { + mockCfg := Config{ + Pipeline: pipeline.Config{ + pipeline.Params{"type": "missing"}, + }, + } + mockLogger := zap.NewNop().Sugar() + mockPluginDir := "/some/path/plugins" + mockDatabaseFile := "" + mockOutput := testutil.NewFakeOutput(t) + + agent, err := NewBuilder(&mockCfg, mockLogger). + WithPluginDir(mockPluginDir). + WithDatabaseFile(mockDatabaseFile). + WithDefaultOutput(mockOutput). + Build() + require.Error(t, err) + require.Nil(t, agent) +} diff --git a/agent/config_test.go b/agent/config_test.go new file mode 100644 index 000000000..77fdbf094 --- /dev/null +++ b/agent/config_test.go @@ -0,0 +1,114 @@ +package agent + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "github.com/observiq/stanza/pipeline" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" +) + +func TestNewConfigFromFile(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + - type: operator +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + config, err := NewConfigFromFile(configFile) + require.NoError(t, err) + require.Equal(t, len(config.Pipeline), 1) +} + +func TestNewConfigWithMissingFile(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + + _, err := NewConfigFromFile(configFile) + require.Error(t, err) + require.Contains(t, err.Error(), "no such file or directory") +} + +func TestNewConfigWithInvalidYAML(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + invalid: structure +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + _, err = NewConfigFromFile(configFile) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to read config file as yaml") +} + +func TestNewConfigFromGlobs(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + - type: operator +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + globs := []string{filepath.Join(tempDir, "*.yaml")} + config, err := NewConfigFromGlobs(globs) + require.NoError(t, err) + require.Equal(t, len(config.Pipeline), 1) +} + +func TestNewConfigFromGlobsWithInvalidGlob(t *testing.T) { + globs := []string{"[]"} + _, err := NewConfigFromGlobs(globs) + require.Error(t, err) + require.Contains(t, err.Error(), "syntax error in pattern") +} + +func TestNewConfigFromGlobsWithNoMatches(t *testing.T) { + tempDir := testutil.NewTempDir(t) + globs := []string{filepath.Join(tempDir, "*.yaml")} + _, err := NewConfigFromGlobs(globs) + require.Error(t, err) + require.Contains(t, err.Error(), "No config files found") +} + +func TestNewConfigFromGlobsWithInvalidConfig(t *testing.T) { + tempDir := testutil.NewTempDir(t) + configFile := filepath.Join(tempDir, "config.yaml") + configContents := ` +pipeline: + invalid: structure +` + err := ioutil.WriteFile(configFile, []byte(configContents), 0755) + require.NoError(t, err) + + globs := []string{filepath.Join(tempDir, "*.yaml")} + _, err = NewConfigFromGlobs(globs) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to read config file as yaml") +} + +func TestMergeConfigs(t *testing.T) { + config1 := Config{ + Pipeline: pipeline.Config{ + {"type": "first"}, + }, + } + + config2 := Config{ + Pipeline: pipeline.Config{ + {"type": "second"}, + }, + } + + config3 := mergeConfigs(&config1, &config2) + require.Equal(t, len(config3.Pipeline), 2) +} diff --git a/cmd/stanza/graph.go b/cmd/stanza/graph.go index a0de85db0..89d0c24bd 100644 --- a/cmd/stanza/graph.go +++ b/cmd/stanza/graph.go @@ -4,8 +4,9 @@ import ( "os" "github.com/observiq/stanza/agent" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/database" pg "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/plugin" "github.com/spf13/cobra" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -43,24 +44,23 @@ func runGraph(_ *cobra.Command, _ []string, flags *RootFlags) { os.Exit(1) } - pluginRegistry, err := operator.NewPluginRegistry(flags.PluginDir) + pluginRegistry, err := plugin.NewPluginRegistry(flags.PluginDir) if err != nil { logger.Errorw("Failed to load plugin registry", zap.Any("error", err)) } buildContext := pg.BuildContext{ - Database: operator.NewStubDatabase(), - PluginRegistry: pluginRegistry, - Logger: logger, + Database: database.NewStubDatabase(), + Logger: logger, } - pipeline, err := cfg.Pipeline.BuildPipeline(buildContext, nil) + pipeline, err := cfg.Pipeline.BuildPipeline(buildContext, pluginRegistry, nil) if err != nil { logger.Errorw("Failed to build operator pipeline", zap.Any("error", err)) os.Exit(1) } - dotGraph, err := pipeline.MarshalDot() + dotGraph, err := pipeline.Render() if err != nil { logger.Errorw("Failed to marshal dot graph", zap.Any("error", err)) os.Exit(1) diff --git a/cmd/stanza/offsets.go b/cmd/stanza/offsets.go index ae6429c43..31c2b2e53 100644 --- a/cmd/stanza/offsets.go +++ b/cmd/stanza/offsets.go @@ -5,7 +5,7 @@ import ( "io" "os" - agent "github.com/observiq/stanza/agent" + "github.com/observiq/stanza/database" "github.com/observiq/stanza/operator/helper" "github.com/spf13/cobra" "go.etcd.io/bbolt" @@ -39,7 +39,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command { Short: "Clear persisted offsets from the database", Args: cobra.ArbitraryArgs, Run: func(command *cobra.Command, args []string) { - db, err := agent.OpenDatabase(rootFlags.DatabaseFile) + db, err := database.OpenDatabase(rootFlags.DatabaseFile) exitOnErr("Failed to open database", err) defer db.Close() defer func() { _ = db.Sync() }() @@ -90,7 +90,7 @@ func NewOffsetsListCmd(rootFlags *RootFlags) *cobra.Command { Short: "List operators with persisted offsets", Args: cobra.NoArgs, Run: func(command *cobra.Command, args []string) { - db, err := agent.OpenDatabase(rootFlags.DatabaseFile) + db, err := database.OpenDatabase(rootFlags.DatabaseFile) exitOnErr("Failed to open database", err) defer db.Close() diff --git a/cmd/stanza/offsets_test.go b/cmd/stanza/offsets_test.go index c2f5d25ce..6c91e9295 100644 --- a/cmd/stanza/offsets_test.go +++ b/cmd/stanza/offsets_test.go @@ -7,7 +7,7 @@ import ( "path/filepath" "testing" - agent "github.com/observiq/stanza/agent" + "github.com/observiq/stanza/database" "github.com/observiq/stanza/operator/helper" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" @@ -27,7 +27,7 @@ func TestOffsets(t *testing.T) { stdout = buf // add an offset to the database - db, err := agent.OpenDatabase(databasePath) + db, err := database.OpenDatabase(databasePath) require.NoError(t, err) db.Update(func(tx *bbolt.Tx) error { bucket, err := tx.CreateBucketIfNotExists(helper.OffsetsBucket) diff --git a/cmd/stanza/service.go b/cmd/stanza/service.go index 51161fefa..2d4422308 100644 --- a/cmd/stanza/service.go +++ b/cmd/stanza/service.go @@ -23,14 +23,23 @@ func (a *AgentService) Start(s service.Service) error { if err := a.agent.Start(); err != nil { a.agent.Errorw("Failed to start stanza agent", zap.Any("error", err)) a.cancel() + return nil } + + a.agent.Info("Stanza agent started") return nil } // Stop will stop the stanza agent. func (a *AgentService) Stop(s service.Service) error { a.agent.Info("Stopping stanza agent") - a.agent.Stop() + if err := a.agent.Stop(); err != nil { + a.agent.Errorw("Failed to stop stanza agent gracefully", zap.Any("error", err)) + a.cancel() + return nil + } + + a.agent.Info("Stanza agent stopped") a.cancel() return nil } diff --git a/database/database.go b/database/database.go new file mode 100644 index 000000000..53304523a --- /dev/null +++ b/database/database.go @@ -0,0 +1,63 @@ +//go:generate mockery --name=^(Database)$ --output=../testutil --outpkg=testutil --case=snake + +package database + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "go.etcd.io/bbolt" +) + +// Database is a database used to save offsets +type Database interface { + Close() error + Sync() error + Update(func(*bbolt.Tx) error) error + View(func(*bbolt.Tx) error) error +} + +// StubDatabase is an implementation of Database that +// succeeds on all calls without persisting anything to disk. +// This is used when --database is unspecified. +type StubDatabase struct{} + +// Close will be ignored by the stub database +func (d *StubDatabase) Close() error { return nil } + +// Sync will be ignored by the stub database +func (d *StubDatabase) Sync() error { return nil } + +// Update will be ignored by the stub database +func (d *StubDatabase) Update(func(tx *bbolt.Tx) error) error { return nil } + +// View will be ignored by the stub database +func (d *StubDatabase) View(func(tx *bbolt.Tx) error) error { return nil } + +// NewStubDatabase creates a new StubDatabase +func NewStubDatabase() *StubDatabase { + return &StubDatabase{} +} + +// OpenDatabase will open and create a database +func OpenDatabase(file string) (Database, error) { + if file == "" { + return NewStubDatabase(), nil + } + + if _, err := os.Stat(filepath.Dir(file)); err != nil { + if os.IsNotExist(err) { + err := os.MkdirAll(filepath.Dir(file), 0755) + if err != nil { + return nil, fmt.Errorf("creating database directory: %s", err) + } + } else { + return nil, err + } + } + + options := &bbolt.Options{Timeout: 1 * time.Second} + return bbolt.Open(file, 0666, options) +} diff --git a/database/database_test.go b/database/database_test.go new file mode 100644 index 000000000..c7c524cd1 --- /dev/null +++ b/database/database_test.go @@ -0,0 +1,89 @@ +package database + +import ( + "io/ioutil" + "os" + "path/filepath" + "runtime" + "testing" + + "github.com/stretchr/testify/require" +) + +// NewTempDir will return a new temp directory for testing +func NewTempDir(t testing.TB) string { + tempDir, err := ioutil.TempDir("", "") + if err != nil { + t.Errorf(err.Error()) + t.FailNow() + } + + t.Cleanup(func() { + os.RemoveAll(tempDir) + }) + + return tempDir +} + +func TestOpenDatabase(t *testing.T) { + t.Run("Simple", func(t *testing.T) { + tempDir := NewTempDir(t) + db, err := OpenDatabase(filepath.Join(tempDir, "test.db")) + require.NoError(t, err) + require.NotNil(t, db) + }) + + t.Run("NoFile", func(t *testing.T) { + db, err := OpenDatabase("") + require.NoError(t, err) + require.NotNil(t, db) + require.IsType(t, &StubDatabase{}, db) + }) + + t.Run("NonexistantPathIsCreated", func(t *testing.T) { + tempDir := NewTempDir(t) + db, err := OpenDatabase(filepath.Join(tempDir, "nonexistdir", "test.db")) + require.NoError(t, err) + require.NotNil(t, db) + }) + + t.Run("BadPermissions", func(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Windows does not have the same kind of file permissions") + } + tempDir := NewTempDir(t) + err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0666) + require.NoError(t, err) + db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) + require.Error(t, err) + require.Nil(t, db) + }) + + t.Run("ExecuteOnlyPermissions", func(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Windows does not have the same kind of file permissions") + } + tempDir := NewTempDir(t) + err := os.MkdirAll(filepath.Join(tempDir, "badperms"), 0111) + require.NoError(t, err) + db, err := OpenDatabase(filepath.Join(tempDir, "badperms", "nonexistdir", "test.db")) + require.Error(t, err) + require.Nil(t, db) + }) + +} + +func TestStubDatabase(t *testing.T) { + stubDatabase := NewStubDatabase() + err := stubDatabase.Close() + require.NoError(t, err) + + err = stubDatabase.Sync() + require.NoError(t, err) + + err = stubDatabase.Update(nil) + require.NoError(t, err) + + err = stubDatabase.View(nil) + require.NoError(t, err) +} diff --git a/entry/entry_test.go b/entry/entry_test.go index de0f1e1a8..c7b9048ac 100644 --- a/entry/entry_test.go +++ b/entry/entry_test.go @@ -191,3 +191,44 @@ func TestFieldFromString(t *testing.T) { }) } } + +func TestAddLabel(t *testing.T) { + entry := Entry{} + entry.AddLabel("label", "value") + expected := map[string]string{"label": "value"} + require.Equal(t, expected, entry.Labels) +} + +func TestAddResourceKey(t *testing.T) { + entry := Entry{} + entry.AddResourceKey("key", "value") + expected := map[string]string{"key": "value"} + require.Equal(t, expected, entry.Resource) +} + +func TestReadToInterfaceMapWithMissingField(t *testing.T) { + entry := Entry{} + field := NewLabelField("label") + dest := map[string]interface{}{} + err := entry.readToInterfaceMap(field, &dest) + require.Error(t, err) + require.Contains(t, err.Error(), "can not be read as a map[string]interface{}") +} + +func TestReadToStringMapWithMissingField(t *testing.T) { + entry := Entry{} + field := NewLabelField("label") + dest := map[string]string{} + err := entry.readToStringMap(field, &dest) + require.Error(t, err) + require.Contains(t, err.Error(), "can not be read as a map[string]string") +} + +func TestReadToInterfaceMissingField(t *testing.T) { + entry := Entry{} + field := NewLabelField("label") + var dest interface{} + err := entry.readToInterface(field, &dest) + require.Error(t, err) + require.Contains(t, err.Error(), "can not be read as a interface{}") +} diff --git a/entry/field_test.go b/entry/field_test.go index 44acf1b55..a280cd1d9 100644 --- a/entry/field_test.go +++ b/entry/field_test.go @@ -42,6 +42,14 @@ func TestFieldUnmarshalJSON(t *testing.T) { } } +func TestFieldUnmarshalJSONFailure(t *testing.T) { + invalidField := []byte(`{"key":"value"}`) + var f Field + err := json.Unmarshal(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "cannot unmarshal object into Go value of type string") +} + func TestFieldMarshalJSON(t *testing.T) { cases := []struct { name string @@ -114,6 +122,14 @@ func TestFieldUnmarshalYAML(t *testing.T) { } } +func TestFieldUnmarshalYAMLFailure(t *testing.T) { + invalidField := []byte(`invalid: field`) + var f Field + err := yaml.UnmarshalStrict(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "cannot unmarshal !!map into string") +} + func TestFieldMarshalYAML(t *testing.T) { cases := []struct { name string @@ -207,6 +223,9 @@ func TestSplitField(t *testing.T) { {"BracketAtEnd", `$record[`, nil, true}, {"SingleQuoteAtEnd", `$record['`, nil, true}, {"DoubleQuoteAtEnd", `$record["`, nil, true}, + {"BracketMissingQuotes", `$record[test]`, nil, true}, + {"CharacterBetweenBracketAndQuote", `$record["test"a]`, nil, true}, + {"CharacterOutsideBracket", `$record["test"]a`, nil, true}, } for _, tc := range cases { @@ -222,3 +241,21 @@ func TestSplitField(t *testing.T) { }) } } + +func TestFieldFromStringInvalidSplit(t *testing.T) { + _, err := fieldFromString("$resource[test]") + require.Error(t, err) + require.Contains(t, err.Error(), "splitting field") +} + +func TestFieldFromStringWithResource(t *testing.T) { + field, err := fieldFromString(`$resource["test"]`) + require.NoError(t, err) + require.Equal(t, "$resource.test", field.String()) +} + +func TestFieldFromStringWithInvalidResource(t *testing.T) { + _, err := fieldFromString(`$resource["test"]["key"]`) + require.Error(t, err) + require.Contains(t, err.Error(), "resource fields cannot be nested") +} diff --git a/entry/nil_field_test.go b/entry/nil_field_test.go new file mode 100644 index 000000000..089f5be24 --- /dev/null +++ b/entry/nil_field_test.go @@ -0,0 +1,37 @@ +package entry + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNilFieldGet(t *testing.T) { + entry := &Entry{} + nilField := NewNilField() + value, ok := nilField.Get(entry) + require.True(t, ok) + require.Nil(t, value) +} + +func TestNilFieldSet(t *testing.T) { + entry := &Entry{} + nilField := NewNilField() + err := nilField.Set(entry, "value") + require.NoError(t, err) + require.Equal(t, *entry, Entry{}) +} + +func TestNilFieldDelete(t *testing.T) { + entry := &Entry{} + nilField := NewNilField() + value, ok := nilField.Delete(entry) + require.True(t, ok) + require.Nil(t, value) + require.Equal(t, *entry, Entry{}) +} + +func TestNilFieldString(t *testing.T) { + nilField := NewNilField() + require.Equal(t, "$nil", nilField.String()) +} diff --git a/entry/record_field.go b/entry/record_field.go index e952ac6a1..da8fb2cde 100644 --- a/entry/record_field.go +++ b/entry/record_field.go @@ -84,7 +84,7 @@ func (f RecordField) Set(entry *Entry, value interface{}) error { for i, key := range f.Keys { if i == len(f.Keys)-1 { currentMap[key] = value - return nil + break } currentMap = f.getNestedMap(currentMap, key) } @@ -122,12 +122,12 @@ func (f RecordField) Delete(entry *Entry) (interface{}, bool) { for i, key := range f.Keys { currentMap, ok := currentValue.(map[string]interface{}) if !ok { - return nil, false + break } currentValue, ok = currentMap[key] if !ok { - return nil, false + break } if i == len(f.Keys)-1 { diff --git a/entry/record_field_test.go b/entry/record_field_test.go index dec7714d2..2c21428ba 100644 --- a/entry/record_field_test.go +++ b/entry/record_field_test.go @@ -1,10 +1,12 @@ package entry import ( + "encoding/json" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + yaml "gopkg.in/yaml.v2" ) func testRecord() map[string]interface{} { @@ -159,6 +161,14 @@ func TestRecordFieldDelete(t *testing.T) { nestedMap(), true, }, + { + "InvalidNestedKey", + NewRecordField("simple_key", "missing"), + testRecord(), + testRecord(), + nil, + false, + }, } for _, tc := range cases { @@ -194,6 +204,13 @@ func TestRecordFieldSet(t *testing.T) { "new_value", "new_value", }, + { + "OverwriteRawWithMap", + NewRecordField("embedded", "field"), + "raw_value", + "new_value", + map[string]interface{}{"embedded": map[string]interface{}{"field": "new_value"}}, + }, { "NewMapValue", NewRecordField(), @@ -268,7 +285,70 @@ func TestRecordFieldParent(t *testing.T) { }) } -func TestFieldChild(t *testing.T) { +func TestRecordFieldChild(t *testing.T) { field := RecordField{[]string{"parent"}} require.Equal(t, RecordField{[]string{"parent", "child"}}, field.Child("child")) } + +func TestRecordFieldMerge(t *testing.T) { + entry := &Entry{} + entry.Record = "raw_value" + field := RecordField{[]string{"embedded"}} + values := map[string]interface{}{"new": "values"} + field.Merge(entry, values) + expected := map[string]interface{}{"embedded": values} + require.Equal(t, expected, entry.Record) +} + +func TestRecordFieldMarshalJSON(t *testing.T) { + recordField := RecordField{Keys: []string{"test"}} + json, err := recordField.MarshalJSON() + require.NoError(t, err) + require.Equal(t, []byte(`"test"`), json) +} + +func TestRecordFieldUnmarshalJSON(t *testing.T) { + fieldString := []byte(`"test"`) + var f RecordField + err := json.Unmarshal(fieldString, &f) + require.NoError(t, err) + require.Equal(t, RecordField{Keys: []string{"test"}}, f) +} + +func TestRecordFieldUnmarshalJSONFailure(t *testing.T) { + invalidField := []byte(`{"key":"value"}`) + var f RecordField + err := json.Unmarshal(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "the field is not a string: json") +} + +func TestRecordFieldMarshalYAML(t *testing.T) { + recordField := RecordField{Keys: []string{"test"}} + yaml, err := recordField.MarshalYAML() + require.NoError(t, err) + require.Equal(t, "test", yaml) +} + +func TestRecordFieldUnmarshalYAML(t *testing.T) { + invalidField := []byte("test") + var f RecordField + err := yaml.UnmarshalStrict(invalidField, &f) + require.NoError(t, err) + require.Equal(t, RecordField{Keys: []string{"test"}}, f) +} + +func TestRecordFieldUnmarshalYAMLFailure(t *testing.T) { + invalidField := []byte(`{"key":"value"}`) + var f RecordField + err := yaml.UnmarshalStrict(invalidField, &f) + require.Error(t, err) + require.Contains(t, err.Error(), "the field is not a string: yaml") +} + +func TestRecordFieldFromJSONDot(t *testing.T) { + jsonDot := "$.test" + recordField := fromJSONDot(jsonDot) + expectedField := RecordField{Keys: []string{"test"}} + require.Equal(t, expectedField, recordField) +} diff --git a/errors/error.go b/errors/error.go index 8af2fb16e..5533846de 100644 --- a/errors/error.go +++ b/errors/error.go @@ -32,10 +32,7 @@ func (e AgentError) MarshalLogObject(encoder zapcore.ObjectEncoder) error { } if len(e.Details) != 0 { - err := encoder.AddObject("details", e.Details) - if err != nil { - return err - } + _ = encoder.AddObject("details", e.Details) } return nil diff --git a/internal/version/version_test.go b/internal/version/version_test.go new file mode 100644 index 000000000..ff0c1f54d --- /dev/null +++ b/internal/version/version_test.go @@ -0,0 +1,29 @@ +package version + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func resetVersion() { + Version = "" + GitHash = "" +} + +func TestGetVersionWithVersion(t *testing.T) { + Version = "0.1.1" + defer resetVersion() + require.Equal(t, Version, GetVersion()) +} + +func TestGetVersionWithGitHash(t *testing.T) { + GitHash = "git hash" + defer resetVersion() + require.Equal(t, GitHash, GetVersion()) +} + +func TestGetVersionWithUnknownVersion(t *testing.T) { + defer resetVersion() + require.Equal(t, "unknown", GetVersion()) +} diff --git a/operator/buffer/memory.go b/operator/buffer/memory.go index c9fae37ff..e641d6adc 100644 --- a/operator/buffer/memory.go +++ b/operator/buffer/memory.go @@ -9,6 +9,7 @@ import ( "sync" "sync/atomic" + "github.com/observiq/stanza/database" "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" "go.etcd.io/bbolt" @@ -50,7 +51,7 @@ func (c MemoryBufferConfig) Build(context operator.BuildContext, pluginID string // at which point it saves the entries into a database. It provides no guarantees about // lost entries if shut down uncleanly. type MemoryBuffer struct { - db operator.Database + db database.Database pluginID string buf chan *entry.Entry inFlight map[uint64]*entry.Entry diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index b0e94de2d..75ddb00dc 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -32,7 +32,7 @@ func init() { func NewInputConfig(operatorID string) *InputConfig { return &InputConfig{ InputConfig: helper.NewInputConfig(operatorID, "file_input"), - PollInterval: operator.Duration{Duration: 200 * time.Millisecond}, + PollInterval: helper.Duration{Duration: 200 * time.Millisecond}, IncludeFileName: true, IncludeFilePath: false, StartAt: "end", @@ -48,13 +48,13 @@ type InputConfig struct { Include []string `json:"include,omitempty" yaml:"include,omitempty"` Exclude []string `json:"exclude,omitempty" yaml:"exclude,omitempty"` - PollInterval operator.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` - Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"` - IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` - IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` - StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` - MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` - Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"` + PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"` + IncludeFileName bool `json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` + IncludeFilePath bool `json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` + StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` + MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"` } // MultilineConfig is the configuration a multiline operation diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 6bac6c827..0ccdcd4fd 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -14,6 +14,7 @@ import ( "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -27,7 +28,7 @@ func newTestFileSource(t *testing.T) (*InputOperator, chan *entry.Entry) { }) cfg := NewInputConfig("testfile") - cfg.PollInterval = operator.Duration{Duration: 50 * time.Millisecond} + cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} cfg.StartAt = "beginning" cfg.Include = []string{"should-be-overwritten"} @@ -50,7 +51,7 @@ func TestFileSource_Build(t *testing.T) { cfg.OutputIDs = []string{"mock"} cfg.Include = []string{"/var/log/testpath.*"} cfg.Exclude = []string{"/var/log/testpath.ex*"} - cfg.PollInterval = operator.Duration{Duration: 10 * time.Millisecond} + cfg.PollInterval = helper.Duration{Duration: 10 * time.Millisecond} return cfg } diff --git a/operator/builtin/input/generate/generate_test.go b/operator/builtin/input/generate/generate_test.go index a36e8588c..5485f81eb 100644 --- a/operator/builtin/input/generate/generate_test.go +++ b/operator/builtin/input/generate/generate_test.go @@ -8,6 +8,7 @@ import ( "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/helper" + "github.com/observiq/stanza/plugin" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -66,7 +67,7 @@ pipeline: tmpl, err := template.New("my_generator").Parse(templateText) require.NoError(t, err) - registry := operator.PluginRegistry{ + registry := plugin.Registry{ "sample": tmpl, } @@ -76,7 +77,7 @@ pipeline: config, err := registry.Render("sample", params) require.NoError(t, err) - expectedConfig := operator.PluginConfig{ + expectedConfig := plugin.Plugin{ Pipeline: []operator.Config{ { Builder: &GenerateInputConfig{ diff --git a/operator/builtin/input/windows/operator.go b/operator/builtin/input/windows/operator.go index 69926f22a..388920d5f 100644 --- a/operator/builtin/input/windows/operator.go +++ b/operator/builtin/input/windows/operator.go @@ -19,10 +19,10 @@ func init() { // EventLogConfig is the configuration of a windows event log operator. type EventLogConfig struct { helper.InputConfig `yaml:",inline"` - Channel string `json:"channel" yaml:"channel"` - MaxReads int `json:"max_reads,omitempty" yaml:"max_reads,omitempty"` - StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` - PollInterval operator.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + Channel string `json:"channel" yaml:"channel"` + MaxReads int `json:"max_reads,omitempty" yaml:"max_reads,omitempty"` + StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"` + PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` } // Build will build a windows event log operator. @@ -64,7 +64,7 @@ func NewDefaultConfig() operator.Builder { InputConfig: helper.NewInputConfig("", "windows_eventlog_input"), MaxReads: 100, StartAt: "end", - PollInterval: operator.Duration{ + PollInterval: helper.Duration{ Duration: 1 * time.Second, }, } @@ -79,7 +79,7 @@ type EventLogInput struct { channel string maxReads int startAt string - pollInterval operator.Duration + pollInterval helper.Duration offsets helper.Persister cancel context.CancelFunc wg *sync.WaitGroup diff --git a/operator/builtin/output/drop/drop_test.go b/operator/builtin/output/drop/drop_test.go index f40ce8f89..c42b00a78 100644 --- a/operator/builtin/output/drop/drop_test.go +++ b/operator/builtin/output/drop/drop_test.go @@ -1,16 +1,73 @@ package drop import ( - "github.com/observiq/stanza/operator/helper" + "context" + "fmt" + "testing" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" ) -func newFakeNullOutput() *DropOutput { - return &DropOutput{ - OutputOperator: helper.OutputOperator{ - BasicOperator: helper.BasicOperator{ - OperatorID: "testnull", - OperatorType: "drop_output", - }, - }, +func NewTestConfig(t *testing.T) (*operator.Config, error) { + json := `{ + "type": "drop_output", + "id": "test_id" + }` + config := &operator.Config{} + err := config.UnmarshalJSON([]byte(json)) + return config, err +} + +func NewTestOutput(t *testing.T) (*DropOutput, error) { + config, err := NewTestConfig(t) + if err != nil { + return nil, err + } + + ctx := testutil.NewBuildContext(t) + op, err := config.Build(ctx) + if err != nil { + return nil, err + } + + output, ok := op.(*DropOutput) + if !ok { + return nil, fmt.Errorf("operator is not a drop output") } + + return output, nil +} + +func TestBuildValid(t *testing.T) { + cfg, err := NewTestConfig(t) + require.NoError(t, err) + + ctx := testutil.NewBuildContext(t) + output, err := cfg.Build(ctx) + require.NoError(t, err) + require.IsType(t, &DropOutput{}, output) +} + +func TestBuildIvalid(t *testing.T) { + cfg, err := NewTestConfig(t) + require.NoError(t, err) + + ctx := testutil.NewBuildContext(t) + ctx.Logger = nil + _, err = cfg.Build(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "build context is missing a logger") +} + +func TestProcess(t *testing.T) { + output, err := NewTestOutput(t) + require.NoError(t, err) + + entry := entry.New() + ctx := context.Background() + result := output.Process(ctx, entry) + require.Nil(t, result) } diff --git a/operator/builtin/output/googlecloud/google_cloud.go b/operator/builtin/output/googlecloud/google_cloud.go index 26a609da2..65e264485 100644 --- a/operator/builtin/output/googlecloud/google_cloud.go +++ b/operator/builtin/output/googlecloud/google_cloud.go @@ -35,7 +35,7 @@ func NewGoogleCloudOutputConfig(operatorID string) *GoogleCloudOutputConfig { OutputConfig: helper.NewOutputConfig(operatorID, "google_cloud_output"), BufferConfig: buffer.NewConfig(), FlusherConfig: flusher.NewConfig(), - Timeout: operator.Duration{Duration: 30 * time.Second}, + Timeout: helper.Duration{Duration: 30 * time.Second}, UseCompression: true, } } @@ -46,14 +46,14 @@ type GoogleCloudOutputConfig struct { BufferConfig buffer.Config `json:"buffer,omitempty" yaml:"buffer,omitempty"` FlusherConfig flusher.Config `json:"flusher,omitempty" yaml:"flusher,omitempty"` - Credentials string `json:"credentials,omitempty" yaml:"credentials,omitempty"` - CredentialsFile string `json:"credentials_file,omitempty" yaml:"credentials_file,omitempty"` - ProjectID string `json:"project_id" yaml:"project_id"` - LogNameField *entry.Field `json:"log_name_field,omitempty" yaml:"log_name_field,omitempty"` - TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"` - SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"` - Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` - UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"` + Credentials string `json:"credentials,omitempty" yaml:"credentials,omitempty"` + CredentialsFile string `json:"credentials_file,omitempty" yaml:"credentials_file,omitempty"` + ProjectID string `json:"project_id" yaml:"project_id"` + LogNameField *entry.Field `json:"log_name_field,omitempty" yaml:"log_name_field,omitempty"` + TraceField *entry.Field `json:"trace_field,omitempty" yaml:"trace_field,omitempty"` + SpanIDField *entry.Field `json:"span_id_field,omitempty" yaml:"span_id_field,omitempty"` + Timeout helper.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + UseCompression bool `json:"use_compression,omitempty" yaml:"use_compression,omitempty"` } // Build will build a google cloud output operator. diff --git a/operator/builtin/output/googlecloud/google_cloud_test.go b/operator/builtin/output/googlecloud/google_cloud_test.go index e1f628760..b0f833ad0 100644 --- a/operator/builtin/output/googlecloud/google_cloud_test.go +++ b/operator/builtin/output/googlecloud/google_cloud_test.go @@ -13,7 +13,7 @@ import ( "github.com/golang/protobuf/ptypes" tspb "github.com/golang/protobuf/ptypes/timestamp" "github.com/observiq/stanza/entry" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/require" "google.golang.org/api/option" @@ -33,7 +33,7 @@ type googleCloudTestCase struct { func googleCloudBasicConfig() *GoogleCloudOutputConfig { cfg := NewGoogleCloudOutputConfig("test_id") cfg.ProjectID = "test_project_id" - cfg.FlusherConfig.MaxWait = operator.Duration{Duration: 10 * time.Millisecond} + cfg.FlusherConfig.MaxWait = helper.Duration{Duration: 10 * time.Millisecond} return cfg } @@ -392,7 +392,7 @@ func (g *googleCloudOutputBenchmark) Run(b *testing.B) { cfg := NewGoogleCloudOutputConfig(g.name) cfg.ProjectID = "test_project_id" - cfg.FlusherConfig.MaxWait = operator.NewDuration(10 * time.Millisecond) + cfg.FlusherConfig.MaxWait = helper.NewDuration(10 * time.Millisecond) if g.configMod != nil { g.configMod(cfg) } diff --git a/operator/builtin/parser/json/json_test.go b/operator/builtin/parser/json/json_test.go index 1b19b2967..322e6bb2c 100644 --- a/operator/builtin/parser/json/json_test.go +++ b/operator/builtin/parser/json/json_test.go @@ -2,6 +2,7 @@ package json import ( "context" + "fmt" "testing" "time" @@ -16,6 +17,88 @@ import ( "go.uber.org/zap" ) +func NewTestConfig(t *testing.T) (*operator.Config, error) { + json := `{ + "type": "json_parser", + "id": "test_id", + "output": "test_output" + }` + config := &operator.Config{} + err := config.UnmarshalJSON([]byte(json)) + return config, err +} + +func NewTestParser(t *testing.T) (*JSONParser, error) { + config, err := NewTestConfig(t) + if err != nil { + return nil, err + } + + ctx := testutil.NewBuildContext(t) + op, err := config.Build(ctx) + if err != nil { + return nil, err + } + + parser, ok := op.(*JSONParser) + if !ok { + return nil, fmt.Errorf("operator is not a json parser") + } + + return parser, nil +} + +func TestJSONParserConfigBuild(t *testing.T) { + config, err := NewTestConfig(t) + require.NoError(t, err) + + ctx := testutil.NewBuildContext(t) + parser, err := config.Build(ctx) + require.NoError(t, err) + require.IsType(t, &JSONParser{}, parser) +} + +func TestJSONParserConfigBuildFailure(t *testing.T) { + config, err := NewTestConfig(t) + require.NoError(t, err) + + parserConfig, ok := config.Builder.(*JSONParserConfig) + require.True(t, ok) + + parserConfig.OnError = "invalid_on_error" + ctx := testutil.NewBuildContext(t) + _, err = config.Build(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid `on_error` field") +} + +func TestJSONParserStringFailure(t *testing.T) { + parser, err := NewTestParser(t) + require.NoError(t, err) + + _, err = parser.parse("invalid") + require.Error(t, err) + require.Contains(t, err.Error(), "error found in #1 byte") +} + +func TestJSONParserByteFailure(t *testing.T) { + parser, err := NewTestParser(t) + require.NoError(t, err) + + _, err = parser.parse([]byte("invalid")) + require.Error(t, err) + require.Contains(t, err.Error(), "error found in #1 byte") +} + +func TestJSONParserInvalidType(t *testing.T) { + parser, err := NewTestParser(t) + require.NoError(t, err) + + _, err = parser.parse([]int{}) + require.Error(t, err) + require.Contains(t, err.Error(), "type []int cannot be parsed as JSON") +} + func NewFakeJSONOperator() (*JSONParser, *testutil.Operator) { mock := testutil.Operator{} logger, _ := zap.NewProduction() diff --git a/operator/builtin/parser/regex/regex_test.go b/operator/builtin/parser/regex/regex_test.go index 0fa02f4d5..eec17b799 100644 --- a/operator/builtin/parser/regex/regex_test.go +++ b/operator/builtin/parser/regex/regex_test.go @@ -2,6 +2,7 @@ package regex import ( "context" + "fmt" "regexp" "testing" @@ -13,6 +14,80 @@ import ( "github.com/stretchr/testify/require" ) +func NewTestConfig(t *testing.T, regex string) (*operator.Config, error) { + json := `{ + "type": "regex_parser", + "id": "test_id", + "regex": "%s", + "output": "test_output" + }` + json = fmt.Sprintf(json, regex) + config := &operator.Config{} + err := config.UnmarshalJSON([]byte(json)) + return config, err +} + +func NewTestParser(t *testing.T, regex string) (*RegexParser, error) { + config, err := NewTestConfig(t, regex) + if err != nil { + return nil, err + } + + ctx := testutil.NewBuildContext(t) + op, err := config.Build(ctx) + if err != nil { + return nil, err + } + + parser, ok := op.(*RegexParser) + if !ok { + return nil, fmt.Errorf("operator is not a regex parser") + } + + return parser, nil +} + +func TestRegexParserBuildFailure(t *testing.T) { + config, err := NewTestConfig(t, "^(?Ptest)") + require.NoError(t, err) + + parserConfig, ok := config.Builder.(*RegexParserConfig) + require.True(t, ok) + + parserConfig.OnError = "invalid_on_error" + ctx := testutil.NewBuildContext(t) + _, err = config.Build(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid `on_error` field") +} + +func TestRegexParserStringFailure(t *testing.T) { + parser, err := NewTestParser(t, "^(?Ptest)") + require.NoError(t, err) + + _, err = parser.parse("invalid") + require.Error(t, err) + require.Contains(t, err.Error(), "regex pattern does not match") +} + +func TestRegexParserByteFailure(t *testing.T) { + parser, err := NewTestParser(t, "^(?Ptest)") + require.NoError(t, err) + + _, err = parser.parse([]byte("invalid")) + require.Error(t, err) + require.Contains(t, err.Error(), "regex pattern does not match") +} + +func TestRegexParserInvalidType(t *testing.T) { + parser, err := NewTestParser(t, "^(?Ptest)") + require.NoError(t, err) + + _, err = parser.parse([]int{}) + require.Error(t, err) + require.Contains(t, err.Error(), "type '[]int' cannot be parsed as regex") +} + func newFakeRegexParser() (*RegexParser, *testutil.Operator) { mockOperator := testutil.Operator{} return &RegexParser{ diff --git a/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go b/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go index 189220afc..76194fd31 100644 --- a/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go +++ b/operator/builtin/transformer/k8smetadata/k8s_metadata_decorator.go @@ -24,18 +24,18 @@ func NewK8sMetadataDecoratorConfig(operatorID string) *K8sMetadataDecoratorConfi TransformerConfig: helper.NewTransformerConfig(operatorID, "k8s_metadata_decorator"), PodNameField: entry.NewResourceField("k8s.pod.name"), NamespaceField: entry.NewResourceField("k8s.namespace.name"), - CacheTTL: operator.Duration{Duration: 10 * time.Minute}, - Timeout: operator.Duration{Duration: 10 * time.Second}, + CacheTTL: helper.Duration{Duration: 10 * time.Minute}, + Timeout: helper.Duration{Duration: 10 * time.Second}, } } // K8sMetadataDecoratorConfig is the configuration of k8s_metadata_decorator operator type K8sMetadataDecoratorConfig struct { helper.TransformerConfig `yaml:",inline"` - PodNameField entry.Field `json:"pod_name_field,omitempty" yaml:"pod_name_field,omitempty"` - NamespaceField entry.Field `json:"namespace_field,omitempty" yaml:"namespace_field,omitempty"` - CacheTTL operator.Duration `json:"cache_ttl,omitempty" yaml:"cache_ttl,omitempty"` - Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + PodNameField entry.Field `json:"pod_name_field,omitempty" yaml:"pod_name_field,omitempty"` + NamespaceField entry.Field `json:"namespace_field,omitempty" yaml:"namespace_field,omitempty"` + CacheTTL helper.Duration `json:"cache_ttl,omitempty" yaml:"cache_ttl,omitempty"` + Timeout helper.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` } // Build will build a k8s_metadata_decorator operator from the supplied configuration diff --git a/operator/builtin/transformer/noop/noop_test.go b/operator/builtin/transformer/noop/noop_test.go index a3d7a4c4f..afc9cceb3 100644 --- a/operator/builtin/transformer/noop/noop_test.go +++ b/operator/builtin/transformer/noop/noop_test.go @@ -1,16 +1,85 @@ package noop import ( + "context" + "fmt" "testing" + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func TestNoopOperatorBuild(t *testing.T) { - cfg := NewNoopOperatorConfig("test_operator_id") - cfg.OutputIDs = []string{"output"} +func NewTestConfig(t *testing.T) (*operator.Config, error) { + json := `{ + "type": "noop", + "id": "test_id", + "output": "test_output" + }` + config := &operator.Config{} + err := config.UnmarshalJSON([]byte(json)) + return config, err +} + +func NewTestOperator(t *testing.T) (*NoopOperator, error) { + config, err := NewTestConfig(t) + if err != nil { + return nil, err + } + + ctx := testutil.NewBuildContext(t) + op, err := config.Build(ctx) + if err != nil { + return nil, err + } + + noop, ok := op.(*NoopOperator) + if !ok { + return nil, fmt.Errorf("operator is not a json parser") + } + + return noop, nil +} + +func TestBuildValid(t *testing.T) { + cfg, err := NewTestConfig(t) + require.NoError(t, err) + + ctx := testutil.NewBuildContext(t) + output, err := cfg.Build(ctx) + require.NoError(t, err) + require.IsType(t, &NoopOperator{}, output) +} - _, err := cfg.Build(testutil.NewBuildContext(t)) +func TestBuildIvalid(t *testing.T) { + cfg, err := NewTestConfig(t) require.NoError(t, err) + + ctx := testutil.NewBuildContext(t) + ctx.Logger = nil + _, err = cfg.Build(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "build context is missing a logger") +} + +func TestProcess(t *testing.T) { + noop, err := NewTestOperator(t) + require.NoError(t, err) + + var processedEntry interface{} + mockOutput := testutil.NewMockOperator("test_output") + mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { processedEntry = args[1] }).Return(nil) + noop.OutputOperators = []operator.Operator{mockOutput} + + entry := entry.New() + entry.AddLabel("label", "value") + entry.AddResourceKey("resource", "value") + + expected := entry.Copy() + ctx := context.Background() + result := noop.Process(ctx, entry) + require.Nil(t, result) + require.Equal(t, expected, processedEntry) } diff --git a/operator/builtin/transformer/ratelimit/rate_limit.go b/operator/builtin/transformer/ratelimit/rate_limit.go index 42056096e..992da702e 100644 --- a/operator/builtin/transformer/ratelimit/rate_limit.go +++ b/operator/builtin/transformer/ratelimit/rate_limit.go @@ -25,9 +25,9 @@ func NewRateLimitConfig(operatorID string) *RateLimitConfig { type RateLimitConfig struct { helper.TransformerConfig `yaml:",inline"` - Rate float64 `json:"rate,omitempty" yaml:"rate,omitempty"` - Interval operator.Duration `json:"interval,omitempty" yaml:"interval,omitempty"` - Burst uint `json:"burst,omitempty" yaml:"burst,omitempty"` + Rate float64 `json:"rate,omitempty" yaml:"rate,omitempty"` + Interval helper.Duration `json:"interval,omitempty" yaml:"interval,omitempty"` + Burst uint `json:"burst,omitempty" yaml:"burst,omitempty"` } // Build will build a rate limit operator. diff --git a/operator/config.go b/operator/config.go index 6236aa7eb..9ae23f53d 100644 --- a/operator/config.go +++ b/operator/config.go @@ -1,10 +1,12 @@ +//go:generate mockery --name=Builder --output=../testutil --outpkg=testutil --filename=operator_builder.go --structname=OperatorBuilder + package operator import ( "encoding/json" "fmt" - "go.etcd.io/bbolt" + "github.com/observiq/stanza/database" "go.uber.org/zap" ) @@ -23,40 +25,9 @@ type Builder interface { // BuildContext supplies contextual resources when building an operator. type BuildContext struct { - PluginRegistry PluginRegistry - Database Database - Parameters map[string]interface{} - Logger *zap.SugaredLogger -} - -// Database is a database used to save offsets -type Database interface { - Close() error - Sync() error - Update(func(*bbolt.Tx) error) error - View(func(*bbolt.Tx) error) error -} - -// StubDatabase is an implementation of Database that -// succeeds on all calls without persisting anything to disk. -// This is used when --database is unspecified. -type StubDatabase struct{} - -// Close will be ignored by the stub database -func (d *StubDatabase) Close() error { return nil } - -// Sync will be ignored by the stub database -func (d *StubDatabase) Sync() error { return nil } - -// Update will be ignored by the stub database -func (d *StubDatabase) Update(func(tx *bbolt.Tx) error) error { return nil } - -// View will be ignored by the stub database -func (d *StubDatabase) View(func(tx *bbolt.Tx) error) error { return nil } - -// NewStubDatabase creates a new StubDatabase -func NewStubDatabase() *StubDatabase { - return &StubDatabase{} + Database database.Database + Parameters map[string]interface{} + Logger *zap.SugaredLogger } // registry is a global registry of operator types to operator builders. diff --git a/operator/config_test.go b/operator/config_test.go index d8930b3d0..315a56288 100644 --- a/operator/config_test.go +++ b/operator/config_test.go @@ -8,22 +8,6 @@ import ( yaml "gopkg.in/yaml.v2" ) -func TestStubDatabase(t *testing.T) { - stub := &StubDatabase{} - - err := stub.Close() - require.NoError(t, err) - - err = stub.Sync() - require.NoError(t, err) - - err = stub.Update(nil) - require.NoError(t, err) - - err = stub.View(nil) - require.NoError(t, err) -} - type FakeBuilder struct { OperatorID string `json:"id" yaml:"id"` OperatorType string `json:"type" yaml:"type"` @@ -36,10 +20,19 @@ func (f *FakeBuilder) ID() string { return "pl func (f *FakeBuilder) Type() string { return "plugin" } func TestUnmarshalJSONErrors(t *testing.T) { + t.Run("ValidJSON", func(t *testing.T) { + Register("fake_operator", func() Builder { return &FakeBuilder{} }) + raw := `{"type":"fake_operator"}` + cfg := &Config{} + err := cfg.UnmarshalJSON([]byte(raw)) + require.NoError(t, err) + require.IsType(t, &FakeBuilder{}, cfg.Builder) + }) + t.Run("InvalidJSON", func(t *testing.T) { raw := `{}}` - var cfg Config - err := json.Unmarshal([]byte(raw), &cfg) + cfg := &Config{} + err := cfg.UnmarshalJSON([]byte(raw)) require.Error(t, err) require.Contains(t, err.Error(), "invalid") }) @@ -85,6 +78,15 @@ func TestMarshalJSON(t *testing.T) { } func TestUnmarshalYAMLErrors(t *testing.T) { + t.Run("ValidYAML", func(t *testing.T) { + Register("fake_operator", func() Builder { return &FakeBuilder{} }) + raw := `type: fake_operator` + var cfg Config + err := yaml.Unmarshal([]byte(raw), &cfg) + require.NoError(t, err) + require.IsType(t, &FakeBuilder{}, cfg.Builder) + }) + t.Run("InvalidYAML", func(t *testing.T) { raw := `-- - \n||\\` var cfg Config @@ -140,3 +142,9 @@ func TestMarshalYAML(t *testing.T) { expected := "id: plugin\ntype: plugin\narray:\n- test\n" require.Equal(t, expected, string(out)) } + +func TestIsDefined(t *testing.T) { + Register("fake_operator", func() Builder { return &FakeBuilder{} }) + require.True(t, IsDefined("fake_operator")) +} + diff --git a/operator/flusher/flusher.go b/operator/flusher/flusher.go index 52b910e55..cbbaf96df 100644 --- a/operator/flusher/flusher.go +++ b/operator/flusher/flusher.go @@ -8,8 +8,8 @@ import ( backoff "github.com/cenkalti/backoff/v4" "github.com/observiq/stanza/entry" - "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/buffer" + "github.com/observiq/stanza/operator/helper" "go.uber.org/zap" "golang.org/x/sync/semaphore" ) @@ -22,7 +22,7 @@ type Config struct { // MaxWait is the maximum amount of time to wait for a full slice of entries // before flushing the entries. Defaults to 1s. - MaxWait operator.Duration `json:"max_wait" yaml:"max_wait"` + MaxWait helper.Duration `json:"max_wait" yaml:"max_wait"` // MaxChunkEntries is the maximum number of entries to flush at a time. // Defaults to 1000. @@ -35,7 +35,7 @@ type Config struct { func NewConfig() Config { return Config{ MaxConcurrent: 16, - MaxWait: operator.Duration{ + MaxWait: helper.Duration{ Duration: time.Second, }, MaxChunkEntries: 1000, diff --git a/operator/flusher/flusher_test.go b/operator/flusher/flusher_test.go index a1e56a105..d10d1d26f 100644 --- a/operator/flusher/flusher_test.go +++ b/operator/flusher/flusher_test.go @@ -6,8 +6,8 @@ import ( "time" "github.com/observiq/stanza/entry" - "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/buffer" + "github.com/observiq/stanza/operator/helper" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/require" ) @@ -31,7 +31,7 @@ func TestFlusher(t *testing.T) { } flusherCfg := NewConfig() - flusherCfg.MaxWait = operator.Duration{ + flusherCfg.MaxWait = helper.Duration{ Duration: 10 * time.Millisecond, } flusher := flusherCfg.Build(buf, flushFunc, nil) diff --git a/operator/duration.go b/operator/helper/duration.go similarity index 98% rename from operator/duration.go rename to operator/helper/duration.go index ef112e170..a72913b43 100644 --- a/operator/duration.go +++ b/operator/helper/duration.go @@ -1,4 +1,4 @@ -package operator +package helper import ( "encoding/json" diff --git a/operator/duration_test.go b/operator/helper/duration_test.go similarity index 98% rename from operator/duration_test.go rename to operator/helper/duration_test.go index 8c5c85d3d..9d2461283 100644 --- a/operator/duration_test.go +++ b/operator/helper/duration_test.go @@ -1,4 +1,4 @@ -package operator +package helper import ( "encoding/json" diff --git a/operator/helper/persister.go b/operator/helper/persister.go index ec7a096e7..89018460a 100644 --- a/operator/helper/persister.go +++ b/operator/helper/persister.go @@ -3,7 +3,7 @@ package helper import ( "sync" - "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/database" "go.etcd.io/bbolt" ) @@ -18,13 +18,13 @@ type Persister interface { // ScopedBBoltPersister is a persister that uses a database for the backend type ScopedBBoltPersister struct { scope []byte - db operator.Database + db database.Database cache map[string][]byte cacheMux sync.Mutex } // NewScopedDBPersister returns a new ScopedBBoltPersister -func NewScopedDBPersister(db operator.Database, scope string) *ScopedBBoltPersister { +func NewScopedDBPersister(db database.Database, scope string) *ScopedBBoltPersister { return &ScopedBBoltPersister{ scope: []byte(scope), db: db, diff --git a/operator/helper/persister_test.go b/operator/helper/persister_test.go new file mode 100644 index 000000000..85eb44fc9 --- /dev/null +++ b/operator/helper/persister_test.go @@ -0,0 +1,35 @@ +package helper + +import ( + "path/filepath" + "testing" + + "github.com/observiq/stanza/database" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" +) + +func TestPersisterCache(t *testing.T) { + stubDatabase := database.NewStubDatabase() + persister := NewScopedDBPersister(stubDatabase, "test") + persister.Set("key", []byte("value")) + value := persister.Get("key") + require.Equal(t, []byte("value"), value) +} + +func TestPersisterLoad(t *testing.T) { + tempDir := testutil.NewTempDir(t) + db, err := database.OpenDatabase(filepath.Join(tempDir, "test.db")) + persister := NewScopedDBPersister(db, "test") + persister.Set("key", []byte("value")) + + err = persister.Sync() + require.NoError(t, err) + + newPersister := NewScopedDBPersister(db, "test") + err = newPersister.Load() + require.NoError(t, err) + + value := newPersister.Get("key") + require.Equal(t, []byte("value"), value) +} diff --git a/operator/operator.go b/operator/operator.go index 11cf2e906..993a9b1bf 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -1,4 +1,4 @@ -//go:generate mockery -name=^(Operator)$ -output=../testutil -outpkg=testutil -case=snake +//go:generate mockery --name=^(Operator)$ --output=../testutil --outpkg=testutil --case=snake package operator diff --git a/pipeline/config.go b/pipeline/config.go index 445a5dfef..00b76f6bc 100644 --- a/pipeline/config.go +++ b/pipeline/config.go @@ -7,6 +7,7 @@ import ( "github.com/observiq/stanza/errors" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/helper" + "github.com/observiq/stanza/plugin" yaml "gopkg.in/yaml.v2" ) @@ -14,8 +15,8 @@ import ( type Config []Params // BuildPipeline will build a pipeline from the config. -func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput operator.Operator) (*Pipeline, error) { - operatorConfigs, err := c.buildOperatorConfigs(context.PluginRegistry) +func (c Config) BuildPipeline(context operator.BuildContext, pluginRegistry plugin.Registry, defaultOutput operator.Operator) (*DirectedPipeline, error) { + operatorConfigs, err := c.buildOperatorConfigs(pluginRegistry) if err != nil { return nil, err } @@ -29,7 +30,7 @@ func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput opera operators = append(operators, defaultOutput) } - pipeline, err := NewPipeline(operators) + pipeline, err := NewDirectedPipeline(operators) if err != nil { return nil, err } @@ -37,7 +38,7 @@ func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput opera return pipeline, nil } -func (c Config) buildOperatorConfigs(pluginRegistry operator.PluginRegistry) ([]operator.Config, error) { +func (c Config) buildOperatorConfigs(pluginRegistry plugin.Registry) ([]operator.Config, error) { operatorConfigs := make([]operator.Config, 0, len(c)) for i, params := range c { @@ -192,7 +193,7 @@ func (p Params) getStringArray(key string) []string { } // BuildConfigs will build operator configs from a params map. -func (p Params) BuildConfigs(pluginRegistry operator.PluginRegistry, namespace string, defaultOutput []string) ([]operator.Config, error) { +func (p Params) BuildConfigs(pluginRegistry plugin.Registry, namespace string, defaultOutput []string) ([]operator.Config, error) { if operator.IsDefined(p.Type()) { return p.buildAsBuiltin(namespace) } @@ -230,7 +231,7 @@ func (p Params) buildAsBuiltin(namespace string) ([]operator.Config, error) { } // buildPlugin will build a plugin config from a params map. -func (p Params) buildPlugin(pluginRegistry operator.PluginRegistry, namespace string, defaultOutput []string) ([]operator.Config, error) { +func (p Params) buildPlugin(pluginRegistry plugin.Registry, namespace string, defaultOutput []string) ([]operator.Config, error) { templateParams := map[string]interface{}{} for key, value := range p { templateParams[key] = value diff --git a/pipeline/config_test.go b/pipeline/config_test.go index 0bd0d6ccc..2daeda78a 100644 --- a/pipeline/config_test.go +++ b/pipeline/config_test.go @@ -5,10 +5,13 @@ import ( "fmt" "testing" + "github.com/observiq/stanza/operator" _ "github.com/observiq/stanza/operator/builtin/input/generate" "github.com/observiq/stanza/operator/builtin/output/drop" _ "github.com/observiq/stanza/operator/builtin/transformer/noop" + "github.com/observiq/stanza/plugin" "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" yaml "gopkg.in/yaml.v2" ) @@ -199,7 +202,8 @@ pipeline: message: test output: {{.output}} ` - err := context.PluginRegistry.Add("plugin", pluginTemplate) + registry := plugin.Registry{} + err := registry.Add("plugin", pluginTemplate) require.NoError(t, err) pipelineConfig := Config{ @@ -214,7 +218,7 @@ pipeline: }, } - _, err = pipelineConfig.BuildPipeline(context, nil) + _, err = pipelineConfig.BuildPipeline(context, registry, nil) require.NoError(t, err) } @@ -237,7 +241,7 @@ func TestBuildValidPipelineDefaultOutput(t *testing.T) { defaultOutput, err := drop.NewDropOutputConfig("$.drop_it").Build(context) require.NoError(t, err) - pl, err := pipelineConfig.BuildPipeline(context, defaultOutput) + pl, err := pipelineConfig.BuildPipeline(context, nil, defaultOutput) require.NoError(t, err) require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.generate_input"), createNodeID("$.drop_it"))) } @@ -265,7 +269,7 @@ func TestBuildValidPipelineNextOutputAndDefaultOutput(t *testing.T) { defaultOutput, err := drop.NewDropOutputConfig("$.drop_it").Build(context) require.NoError(t, err) - pl, err := pipelineConfig.BuildPipeline(context, defaultOutput) + pl, err := pipelineConfig.BuildPipeline(context, nil, defaultOutput) require.NoError(t, err) require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.generate_input"), createNodeID("$.noop"))) require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.noop"), createNodeID("$.drop_it"))) @@ -282,7 +286,8 @@ pipeline: record: message: test ` - err := context.PluginRegistry.Add("plugin", pluginTemplate) + registry := plugin.Registry{} + err := registry.Add("plugin", pluginTemplate) require.NoError(t, err) pipelineConfig := Config{ @@ -295,7 +300,7 @@ pipeline: defaultOutput, err := drop.NewDropOutputConfig("$.drop_it").Build(context) require.NoError(t, err) - pl, err := pipelineConfig.BuildPipeline(context, defaultOutput) + pl, err := pipelineConfig.BuildPipeline(context, registry, defaultOutput) require.NoError(t, err) require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.plugin.plugin_generate"), createNodeID("$.drop_it"))) } @@ -315,7 +320,7 @@ func TestBuildInvalidPipelineInvalidType(t *testing.T) { }, } - _, err := pipelineConfig.BuildPipeline(context, nil) + _, err := pipelineConfig.BuildPipeline(context, nil, nil) require.Error(t, err) require.Contains(t, err.Error(), "unsupported `type` for operator config") } @@ -331,7 +336,8 @@ pipeline: message: test output: {{.output}} ` - err := context.PluginRegistry.Add("plugin", pluginTemplate) + registry := plugin.Registry{} + err := registry.Add("plugin", pluginTemplate) require.NoError(t, err) pipelineConfig := Config{ @@ -346,7 +352,7 @@ pipeline: }, } - _, err = pipelineConfig.BuildPipeline(context, nil) + _, err = pipelineConfig.BuildPipeline(context, registry, nil) require.Error(t, err) require.Contains(t, err.Error(), "build operator configs") } @@ -366,7 +372,7 @@ func TestBuildInvalidPipelineInvalidOperator(t *testing.T) { } context := testutil.NewBuildContext(t) - _, err := pipelineConfig.BuildPipeline(context, nil) + _, err := pipelineConfig.BuildPipeline(context, nil, nil) require.Error(t, err) require.Contains(t, err.Error(), "field number not found") } @@ -391,7 +397,7 @@ func TestBuildInvalidPipelineInvalidGraph(t *testing.T) { } context := testutil.NewBuildContext(t) - _, err := pipelineConfig.BuildPipeline(context, nil) + _, err := pipelineConfig.BuildPipeline(context, nil, nil) require.Error(t, err) require.Contains(t, err.Error(), "does not exist") } @@ -411,7 +417,8 @@ pipeline: record: test output: {{.output}} ` - err := context.PluginRegistry.Add("plugin", pluginTemplate) + registry := plugin.Registry{} + err := registry.Add("plugin", pluginTemplate) require.NoError(t, err) config := Config{ @@ -425,7 +432,7 @@ pipeline: }, } - configs, err := config.buildOperatorConfigs(context.PluginRegistry) + configs, err := config.buildOperatorConfigs(registry) require.NoError(t, err) require.Len(t, configs, 3) @@ -502,3 +509,67 @@ func TestMultiRoundtripParams(t *testing.T) { require.Equal(t, marshalledYaml, marshalledYaml2) } } + +func TestBuildPipelineWithFailingOperator(t *testing.T) { + ctx := testutil.NewBuildContext(t) + + type invalidOperatorConfig struct { + OperatorType string `json:"type" yaml:"type"` + testutil.OperatorBuilder + } + + newBuilder := func() operator.Builder { + config := &invalidOperatorConfig{} + config.On("Build", mock.Anything).Return(nil, fmt.Errorf("failed to build operator")) + config.On("SetNamespace", mock.Anything, mock.Anything).Return() + config.On("ID").Return("test_id") + config.On("Type").Return("invalid_operator") + return config + } + + operator.Register("invalid_operator", newBuilder) + config := Config{ + {"type": "invalid_operator"}, + } + _, err := config.BuildPipeline(ctx, nil, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to build operator") +} + +func TestBuildPipelineWithInvalidParam(t *testing.T) { + ctx := testutil.NewBuildContext(t) + config := Config{ + {"missing": "type"}, + } + _, err := config.BuildPipeline(ctx, nil, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "missing required `type` field") +} + +type invalidYaml struct{} + +func (y invalidYaml) MarshalYAML() (interface{}, error) { + return nil, fmt.Errorf("invalid yaml") +} + +func TestBuildAsBuiltinWithInvalidParam(t *testing.T) { + params := Params{ + "field": invalidYaml{}, + } + _, err := params.buildAsBuiltin("test_namespace") + require.Error(t, err) + require.Contains(t, err.Error(), "failed to parse config map as yaml") +} + +func TestUnmarshalParamsWithInvalidBytes(t *testing.T) { + bytes := []byte("string") + var params Params + err := yaml.Unmarshal(bytes, ¶ms) + require.Error(t, err) + require.Contains(t, err.Error(), "unmarshal errors") +} + +func TestCleanValueWithUnknownType(t *testing.T) { + value := cleanValue(map[int]int{}) + require.Equal(t, "map[]", value) +} diff --git a/pipeline/directed.go b/pipeline/directed.go new file mode 100644 index 000000000..c785da2a4 --- /dev/null +++ b/pipeline/directed.go @@ -0,0 +1,190 @@ +package pipeline + +import ( + "fmt" + "strings" + + "github.com/observiq/stanza/errors" + "github.com/observiq/stanza/operator" + "gonum.org/v1/gonum/graph/encoding/dot" + "gonum.org/v1/gonum/graph/simple" + "gonum.org/v1/gonum/graph/topo" +) + +// DirectedPipeline is a pipeline backed by a directed graph +type DirectedPipeline struct { + Graph *simple.DirectedGraph + running bool +} + +// Start will start the operators in a pipeline in reverse topological order +func (p *DirectedPipeline) Start() error { + if p.running { + return nil + } + + sortedNodes, _ := topo.Sort(p.Graph) + for i := len(sortedNodes) - 1; i >= 0; i-- { + operator := sortedNodes[i].(OperatorNode).Operator() + operator.Logger().Debug("Starting operator") + if err := operator.Start(); err != nil { + return err + } + operator.Logger().Debug("Started operator") + } + + p.running = true + return nil +} + +// Stop will stop the operators in a pipeline in topological order +func (p *DirectedPipeline) Stop() error { + if !p.running { + return nil + } + + sortedNodes, _ := topo.Sort(p.Graph) + for _, node := range sortedNodes { + operator := node.(OperatorNode).Operator() + operator.Logger().Debug("Stopping operator") + _ = operator.Stop() + operator.Logger().Debug("Stopped operator") + } + + p.running = false + return nil +} + +// Running will return the running state of the pipeline +func (p *DirectedPipeline) Running() bool { + return p.running +} + +// Render will render the pipeline as a dot graph +func (p *DirectedPipeline) Render() ([]byte, error) { + return dot.Marshal(p.Graph, "G", "", " ") +} + +// addNodes will add operators as nodes to the supplied graph. +func addNodes(graph *simple.DirectedGraph, operators []operator.Operator) error { + for _, operator := range operators { + operatorNode := createOperatorNode(operator) + if graph.Node(operatorNode.ID()) != nil { + return errors.NewError( + fmt.Sprintf("operator with id '%s' already exists in pipeline", operatorNode.Operator().ID()), + "ensure that each operator has a unique `type` or `id`", + ) + } + + graph.AddNode(operatorNode) + } + return nil +} + +// connectNodes will connect the nodes in the supplied graph. +func connectNodes(graph *simple.DirectedGraph) error { + nodes := graph.Nodes() + for nodes.Next() { + node := nodes.Node().(OperatorNode) + if err := connectNode(graph, node); err != nil { + return err + } + } + + if _, err := topo.Sort(graph); err != nil { + return errors.NewError( + "pipeline has a circular dependency", + "ensure that all operators are connected in a straight, acyclic line", + "cycles", unorderableToCycles(err.(topo.Unorderable)), + ) + } + + return nil +} + +// connectNode will connect a node to its outputs in the supplied graph. +func connectNode(graph *simple.DirectedGraph, inputNode OperatorNode) error { + for outputOperatorID, outputNodeID := range inputNode.OutputIDs() { + if graph.Node(outputNodeID) == nil { + return errors.NewError( + "operators cannot be connected, because the output does not exist in the pipeline", + "ensure that the output operator is defined", + "input_operator", inputNode.Operator().ID(), + "output_operator", outputOperatorID, + ) + } + + outputNode := graph.Node(outputNodeID).(OperatorNode) + if !outputNode.Operator().CanProcess() { + return errors.NewError( + "operators cannot be connected, because the output operator can not process logs", + "ensure that the output operator can process logs (like a parser or destination)", + "input_operator", inputNode.Operator().ID(), + "output_operator", outputOperatorID, + ) + } + + if graph.HasEdgeFromTo(inputNode.ID(), outputNodeID) { + return errors.NewError( + "operators cannot be connected, because a connection already exists", + "ensure that only a single connection exists between the two operators", + "input_operator", inputNode.Operator().ID(), + "output_operator", outputOperatorID, + ) + } + + edge := graph.NewEdge(inputNode, outputNode) + graph.SetEdge(edge) + } + + return nil +} + +// setOperatorOutputs will set the outputs on operators that can output. +func setOperatorOutputs(operators []operator.Operator) error { + for _, operator := range operators { + if !operator.CanOutput() { + continue + } + + if err := operator.SetOutputs(operators); err != nil { + return errors.WithDetails(err, "operator_id", operator.ID()) + } + } + return nil +} + +// NewDirectedPipeline creates a new directed pipeline +func NewDirectedPipeline(operators []operator.Operator) (*DirectedPipeline, error) { + if err := setOperatorOutputs(operators); err != nil { + return nil, err + } + + graph := simple.NewDirectedGraph() + if err := addNodes(graph, operators); err != nil { + return nil, err + } + + if err := connectNodes(graph); err != nil { + return nil, err + } + + return &DirectedPipeline{Graph: graph}, nil +} + +func unorderableToCycles(err topo.Unorderable) string { + var cycles strings.Builder + for i, cycle := range err { + if i != 0 { + cycles.WriteByte(',') + } + cycles.WriteByte('(') + for _, node := range cycle { + cycles.WriteString(node.(OperatorNode).operator.ID()) + cycles.Write([]byte(` -> `)) + } + cycles.WriteString(cycle[0].(OperatorNode).operator.ID()) + cycles.WriteByte(')') + } + return cycles.String() +} diff --git a/pipeline/pipeline_test.go b/pipeline/directed_test.go similarity index 56% rename from pipeline/pipeline_test.go rename to pipeline/directed_test.go index 2cb68a116..92753663f 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/directed_test.go @@ -1,12 +1,14 @@ package pipeline import ( + "fmt" "testing" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/zap" "gonum.org/v1/gonum/graph" "gonum.org/v1/gonum/graph/simple" "gonum.org/v1/gonum/graph/topo" @@ -67,7 +69,7 @@ func TestUnorderableToCycles(t *testing.T) { func TestPipeline(t *testing.T) { t.Run("MultipleStart", func(t *testing.T) { - pipeline, err := NewPipeline([]operator.Operator{}) + pipeline, err := NewDirectedPipeline([]operator.Operator{}) require.NoError(t, err) err = pipeline.Start() @@ -80,7 +82,7 @@ func TestPipeline(t *testing.T) { }) t.Run("MultipleStop", func(t *testing.T) { - pipeline, err := NewPipeline([]operator.Operator{}) + pipeline, err := NewDirectedPipeline([]operator.Operator{}) require.NoError(t, err) err = pipeline.Start() @@ -98,7 +100,7 @@ func TestPipeline(t *testing.T) { operator2.On("SetOutputs", mock.Anything).Return(nil) operator2.On("Outputs").Return(nil) - _, err := NewPipeline([]operator.Operator{operator1, operator2}) + _, err := NewDirectedPipeline([]operator.Operator{operator1, operator2}) require.Error(t, err) require.Contains(t, err.Error(), "already exists") }) @@ -112,7 +114,7 @@ func TestPipeline(t *testing.T) { operator2.On("SetOutputs", mock.Anything).Return(nil) operator2.On("Outputs").Return([]operator.Operator{operator1}) - _, err := NewPipeline([]operator.Operator{operator2}) + _, err := NewDirectedPipeline([]operator.Operator{operator2}) require.Error(t, err) require.Contains(t, err.Error(), "does not exist") }) @@ -129,7 +131,7 @@ func TestPipeline(t *testing.T) { operator2.On("SetOutputs", mock.Anything).Return(nil) operator2.On("Outputs").Return([]operator.Operator{operator1}) - _, err := NewPipeline([]operator.Operator{operator1, operator2}) + _, err := NewDirectedPipeline([]operator.Operator{operator1, operator2}) require.Error(t, err) require.Contains(t, err.Error(), "can not process") }) @@ -168,8 +170,113 @@ func TestPipeline(t *testing.T) { mockOperator3.On("Outputs").Return([]operator.Operator{mockOperator1}) mockOperator3.On("SetOutputs", mock.Anything).Return(nil) - _, err := NewPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) + _, err := NewDirectedPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) require.Error(t, err) require.Contains(t, err.Error(), "circular dependency") }) } + +func TestPipelineStartOrder(t *testing.T) { + var mock2Started bool + var mock3Started bool + + mockOperator1 := testutil.NewMockOperator("operator1") + mockOperator2 := testutil.NewMockOperator("operator2") + mockOperator3 := testutil.NewMockOperator("operator3") + + mockOperator1.On("Outputs").Return([]operator.Operator{mockOperator2}) + mockOperator2.On("Outputs").Return([]operator.Operator{mockOperator3}) + mockOperator3.On("Outputs").Return(nil) + + mockOperator1.On("SetOutputs", mock.Anything).Return(nil) + mockOperator2.On("SetOutputs", mock.Anything).Return(nil) + mockOperator3.On("SetOutputs", mock.Anything).Return(nil) + + mockOperator1.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + mockOperator2.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + mockOperator3.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + + mockOperator1.On("Start").Return(fmt.Errorf("operator 1 failed to start")) + mockOperator2.On("Start").Run(func(mock.Arguments) { mock2Started = true }).Return(nil) + mockOperator3.On("Start").Run(func(mock.Arguments) { mock3Started = true }).Return(nil) + + pipeline, err := NewDirectedPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) + require.NoError(t, err) + + err = pipeline.Start() + require.Error(t, err) + require.Contains(t, err.Error(), "operator 1 failed to start") + require.True(t, mock2Started) + require.True(t, mock3Started) +} + +func TestPipelineStopOrder(t *testing.T) { + stopOrder := []int{} + + mockOperator1 := testutil.NewMockOperator("operator1") + mockOperator2 := testutil.NewMockOperator("operator2") + mockOperator3 := testutil.NewMockOperator("operator3") + + mockOperator1.On("Outputs").Return([]operator.Operator{mockOperator2}) + mockOperator2.On("Outputs").Return([]operator.Operator{mockOperator3}) + mockOperator3.On("Outputs").Return(nil) + + mockOperator1.On("SetOutputs", mock.Anything).Return(nil) + mockOperator2.On("SetOutputs", mock.Anything).Return(nil) + mockOperator3.On("SetOutputs", mock.Anything).Return(nil) + + mockOperator1.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + mockOperator2.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + mockOperator3.On("Logger", mock.Anything).Return(zap.NewNop().Sugar()) + + mockOperator1.On("Start").Return(nil) + mockOperator2.On("Start").Return(nil) + mockOperator3.On("Start").Return(nil) + + mockOperator1.On("Stop").Run(func(mock.Arguments) { stopOrder = append(stopOrder, 1) }).Return(nil) + mockOperator2.On("Stop").Run(func(mock.Arguments) { stopOrder = append(stopOrder, 2) }).Return(nil) + mockOperator3.On("Stop").Run(func(mock.Arguments) { stopOrder = append(stopOrder, 3) }).Return(nil) + + pipeline, err := NewDirectedPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) + require.NoError(t, err) + + err = pipeline.Start() + require.NoError(t, err) + require.True(t, pipeline.Running()) + + err = pipeline.Stop() + require.NoError(t, err) + require.False(t, pipeline.Running()) + require.Equal(t, []int{1, 2, 3}, stopOrder) +} + +func TestPipelineRender(t *testing.T) { + mockOperator1 := testutil.NewMockOperator("operator1") + mockOperator2 := testutil.NewMockOperator("operator2") + mockOperator3 := testutil.NewMockOperator("operator3") + + mockOperator1.On("Outputs").Return([]operator.Operator{mockOperator2}) + mockOperator2.On("Outputs").Return([]operator.Operator{mockOperator3}) + mockOperator3.On("Outputs").Return(nil) + + mockOperator1.On("SetOutputs", mock.Anything).Return(nil) + mockOperator2.On("SetOutputs", mock.Anything).Return(nil) + mockOperator3.On("SetOutputs", mock.Anything).Return(nil) + + pipeline, err := NewDirectedPipeline([]operator.Operator{mockOperator1, mockOperator2, mockOperator3}) + require.NoError(t, err) + + dotGraph, err := pipeline.Render() + require.NoError(t, err) + expected := `strict digraph G { + // Node definitions. + operator1; + operator3; + operator2; + + // Edge definitions. + operator1 -> operator2; + operator2 -> operator3; +}` + require.Equal(t, expected, string(dotGraph)) +} diff --git a/pipeline/node_test.go b/pipeline/node_test.go new file mode 100644 index 000000000..9002bae08 --- /dev/null +++ b/pipeline/node_test.go @@ -0,0 +1,22 @@ +package pipeline + +import ( + "testing" + + _ "github.com/observiq/stanza/operator/builtin/input/generate" + _ "github.com/observiq/stanza/operator/builtin/transformer/noop" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" +) + +func TestNodeDOTID(t *testing.T) { + operator := testutil.NewMockOperator("test") + operator.On("Outputs").Return(nil) + node := createOperatorNode(operator) + require.Equal(t, operator.ID(), node.DOTID()) +} + +func TestCreateNodeID(t *testing.T) { + nodeID := createNodeID("test_id") + require.Equal(t, int64(5795108767401590291), nodeID) +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index c09fe6f21..1a551edfb 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -1,184 +1,11 @@ -package pipeline - -import ( - "fmt" - "strings" - - "github.com/observiq/stanza/errors" - "github.com/observiq/stanza/operator" - "gonum.org/v1/gonum/graph/encoding/dot" - "gonum.org/v1/gonum/graph/simple" - "gonum.org/v1/gonum/graph/topo" -) - -// Pipeline is a directed graph of connected operators. -type Pipeline struct { - Graph *simple.DirectedGraph - running bool -} - -// Start will start the operators in a pipeline in reverse topological order. -func (p *Pipeline) Start() error { - if p.running { - return nil - } - - sortedNodes, _ := topo.Sort(p.Graph) - for i := len(sortedNodes) - 1; i >= 0; i-- { - operator := sortedNodes[i].(OperatorNode).Operator() - operator.Logger().Debug("Starting operator") - if err := operator.Start(); err != nil { - return err - } - operator.Logger().Debug("Started operator") - } - - p.running = true - return nil -} - -// Stop will stop the operators in a pipeline in topological order. -func (p *Pipeline) Stop() { - if !p.running { - return - } - - sortedNodes, _ := topo.Sort(p.Graph) - for _, node := range sortedNodes { - operator := node.(OperatorNode).Operator() - operator.Logger().Debug("Stopping operator") - _ = operator.Stop() - operator.Logger().Debug("Stopped operator") - } - - p.running = false -} - -// MarshalDot will encode the pipeline as a dot graph. -func (p *Pipeline) MarshalDot() ([]byte, error) { - return dot.Marshal(p.Graph, "G", "", " ") -} - -// addNodes will add operators as nodes to the supplied graph. -func addNodes(graph *simple.DirectedGraph, operators []operator.Operator) error { - for _, operator := range operators { - operatorNode := createOperatorNode(operator) - if graph.Node(operatorNode.ID()) != nil { - return errors.NewError( - fmt.Sprintf("operator with id '%s' already exists in pipeline", operatorNode.Operator().ID()), - "ensure that each operator has a unique `type` or `id`", - ) - } - - graph.AddNode(operatorNode) - } - return nil -} +//go:generate mockery --name=^(Pipeline)$ --output=../testutil --outpkg=testutil --case=snake -// connectNodes will connect the nodes in the supplied graph. -func connectNodes(graph *simple.DirectedGraph) error { - nodes := graph.Nodes() - for nodes.Next() { - node := nodes.Node().(OperatorNode) - if err := connectNode(graph, node); err != nil { - return err - } - } - - if _, err := topo.Sort(graph); err != nil { - return errors.NewError( - "pipeline has a circular dependency", - "ensure that all operators are connected in a straight, acyclic line", - "cycles", unorderableToCycles(err.(topo.Unorderable)), - ) - } - - return nil -} - -// connectNode will connect a node to its outputs in the supplied graph. -func connectNode(graph *simple.DirectedGraph, inputNode OperatorNode) error { - for outputOperatorID, outputNodeID := range inputNode.OutputIDs() { - if graph.Node(outputNodeID) == nil { - return errors.NewError( - "operators cannot be connected, because the output does not exist in the pipeline", - "ensure that the output operator is defined", - "input_operator", inputNode.Operator().ID(), - "output_operator", outputOperatorID, - ) - } - - outputNode := graph.Node(outputNodeID).(OperatorNode) - if !outputNode.Operator().CanProcess() { - return errors.NewError( - "operators cannot be connected, because the output operator can not process logs", - "ensure that the output operator can process logs (like a parser or destination)", - "input_operator", inputNode.Operator().ID(), - "output_operator", outputOperatorID, - ) - } - - if graph.HasEdgeFromTo(inputNode.ID(), outputNodeID) { - return errors.NewError( - "operators cannot be connected, because a connection already exists", - "ensure that only a single connection exists between the two operators", - "input_operator", inputNode.Operator().ID(), - "output_operator", outputOperatorID, - ) - } - - edge := graph.NewEdge(inputNode, outputNode) - graph.SetEdge(edge) - } - - return nil -} - -// setOperatorOutputs will set the outputs on operators that can output. -func setOperatorOutputs(operators []operator.Operator) error { - for _, operator := range operators { - if !operator.CanOutput() { - continue - } - - if err := operator.SetOutputs(operators); err != nil { - return errors.WithDetails(err, "operator_id", operator.ID()) - } - } - return nil -} - -// NewPipeline creates a new pipeline of connected operators. -func NewPipeline(operators []operator.Operator) (*Pipeline, error) { - if err := setOperatorOutputs(operators); err != nil { - return nil, err - } - - graph := simple.NewDirectedGraph() - if err := addNodes(graph, operators); err != nil { - return nil, err - } - - if err := connectNodes(graph); err != nil { - return nil, err - } - - return &Pipeline{Graph: graph}, nil -} +package pipeline -func unorderableToCycles(err topo.Unorderable) string { - var cycles strings.Builder - for i, cycle := range err { - if i != 0 { - cycles.WriteByte(',') - } - cycles.WriteByte('(') - for _, node := range cycle { - cycles.WriteString(node.(OperatorNode).operator.ID()) - cycles.Write([]byte(` -> `)) - } - cycles.WriteString(cycle[0].(OperatorNode).operator.ID()) - cycles.WriteByte(')') - } - return cycles.String() +// Pipeline is a collection of connected operators that exchange entries +type Pipeline interface { + Start() error + Stop() error + Render() ([]byte, error) + Running() bool } diff --git a/operator/plugin_parameter.go b/plugin/parameter.go similarity index 85% rename from operator/plugin_parameter.go rename to plugin/parameter.go index db6d87147..14e1db9f7 100644 --- a/operator/plugin_parameter.go +++ b/plugin/parameter.go @@ -1,4 +1,4 @@ -package operator +package plugin import ( "fmt" @@ -14,8 +14,8 @@ const ( enumType = "enum" ) -// PluginParameter is a basic description of a plugin's parameter. -type PluginParameter struct { +// Parameter is a basic description of a plugin's parameter. +type Parameter struct { Label string Description string Required bool @@ -24,7 +24,7 @@ type PluginParameter struct { Default interface{} // Must be valid according to Type & ValidValues } -func (param PluginParameter) validate() error { +func (param Parameter) validate() error { if param.Required && param.Default != nil { return errors.NewError( "required parameter cannot have a default value", @@ -47,7 +47,7 @@ func (param PluginParameter) validate() error { return nil } -func (param PluginParameter) validateType() error { +func (param Parameter) validateType() error { switch param.Type { case stringType, intType, boolType, stringsType, enumType: // ok default: @@ -59,7 +59,7 @@ func (param PluginParameter) validateType() error { return nil } -func (param PluginParameter) validateValidValues() error { +func (param Parameter) validateValidValues() error { switch param.Type { case stringType, intType, boolType, stringsType: if len(param.ValidValues) > 0 { @@ -79,7 +79,7 @@ func (param PluginParameter) validateValidValues() error { return nil } -func (param PluginParameter) validateDefault() error { +func (param Parameter) validateDefault() error { if param.Default == nil { return nil } @@ -104,7 +104,7 @@ func (param PluginParameter) validateDefault() error { } } -func validateStringDefault(param PluginParameter) error { +func validateStringDefault(param Parameter) error { if _, ok := param.Default.(string); !ok { return errors.NewError( "default value for a parameter of type 'string' must be a string", @@ -114,7 +114,7 @@ func validateStringDefault(param PluginParameter) error { return nil } -func validateIntDefault(param PluginParameter) error { +func validateIntDefault(param Parameter) error { switch param.Default.(type) { case int, int32, int64: return nil @@ -126,7 +126,7 @@ func validateIntDefault(param PluginParameter) error { } } -func validateBoolDefault(param PluginParameter) error { +func validateBoolDefault(param Parameter) error { if _, ok := param.Default.(bool); !ok { return errors.NewError( "default value for a parameter of type 'bool' must be a boolean", @@ -136,7 +136,7 @@ func validateBoolDefault(param PluginParameter) error { return nil } -func validateStringArrayDefault(param PluginParameter) error { +func validateStringArrayDefault(param Parameter) error { defaultList, ok := param.Default.([]interface{}) if !ok { return errors.NewError( @@ -155,7 +155,7 @@ func validateStringArrayDefault(param PluginParameter) error { return nil } -func validateEnumDefault(param PluginParameter) error { +func validateEnumDefault(param Parameter) error { def, ok := param.Default.(string) if !ok { return errors.NewError( diff --git a/plugin/parameter_test.go b/plugin/parameter_test.go new file mode 100644 index 000000000..723e4de24 --- /dev/null +++ b/plugin/parameter_test.go @@ -0,0 +1,126 @@ +package plugin + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValidateDefault(t *testing.T) { + testCases := []struct { + name string + expectErr bool + param Parameter + }{ + { + "ValidStringDefault", + false, + Parameter{ + Type: "string", + Default: "test", + }, + }, + { + "InvalidStringDefault", + true, + Parameter{ + Type: "string", + Default: 5, + }, + }, + { + "ValidIntDefault", + false, + Parameter{ + Type: "int", + Default: 5, + }, + }, + { + "InvalidStringDefault", + true, + Parameter{ + Type: "int", + Default: "test", + }, + }, + { + "ValidBoolDefault", + false, + Parameter{ + Type: "bool", + Default: true, + }, + }, + { + "InvalidBoolDefault", + true, + Parameter{ + Type: "bool", + Default: "test", + }, + }, + { + "ValidStringsDefault", + false, + Parameter{ + Type: "strings", + Default: []interface{}{"test"}, + }, + }, + { + "InvalidStringsDefault", + true, + Parameter{ + Type: "strings", + Default: []interface{}{5}, + }, + }, + { + "ValidEnumDefault", + false, + Parameter{ + Type: "enum", + ValidValues: []string{"test"}, + Default: "test", + }, + }, + { + "InvalidEnumDefault", + true, + Parameter{ + Type: "enum", + ValidValues: []string{"test"}, + Default: "invalid", + }, + }, + { + "NonStringEnumDefault", + true, + Parameter{ + Type: "enum", + ValidValues: []string{"test"}, + Default: 5, + }, + }, + { + "InvalidTypeDefault", + true, + Parameter{ + Type: "float", + Default: 5, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.param.validateDefault() + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/operator/plugin.go b/plugin/plugin.go similarity index 72% rename from operator/plugin.go rename to plugin/plugin.go index ea14a73e9..53b7c2257 100644 --- a/operator/plugin.go +++ b/plugin/plugin.go @@ -1,4 +1,4 @@ -package operator +package plugin import ( "bytes" @@ -9,26 +9,27 @@ import ( "text/template" "github.com/observiq/stanza/errors" + "github.com/observiq/stanza/operator" yaml "gopkg.in/yaml.v2" ) -// PluginConfig is the rendered config of a plugin. -type PluginConfig struct { +// Plugin is the rendered result of a plugin template. +type Plugin struct { Version string Title string Description string - Parameters map[string]PluginParameter - Pipeline []Config + Parameters map[string]Parameter + Pipeline []operator.Config } -// PluginRegistry is a registry of plugin templates. -type PluginRegistry map[string]*template.Template +// Registry is a registry of plugin templates. +type Registry map[string]*template.Template -// Render will render a plugin config using the params and plugin type. -func (r PluginRegistry) Render(pluginType string, params map[string]interface{}) (PluginConfig, error) { +// Render will render a plugin using the params and plugin type. +func (r Registry) Render(pluginType string, params map[string]interface{}) (Plugin, error) { template, ok := r[pluginType] if !ok { - return PluginConfig{}, errors.NewError( + return Plugin{}, errors.NewError( "plugin type does not exist", "ensure that all plugins are defined with a registered type", "plugin_type", pluginType, @@ -37,7 +38,7 @@ func (r PluginRegistry) Render(pluginType string, params map[string]interface{}) var writer bytes.Buffer if err := template.Execute(&writer, params); err != nil { - return PluginConfig{}, errors.NewError( + return Plugin{}, errors.NewError( "failed to render template for plugin", "ensure that all parameters are valid for the plugin", "plugin_type", pluginType, @@ -45,10 +46,10 @@ func (r PluginRegistry) Render(pluginType string, params map[string]interface{}) ) } - var config PluginConfig - if err := yaml.UnmarshalStrict(writer.Bytes(), &config); err != nil { - return PluginConfig{}, errors.NewError( - "failed to unmarshal plugin template to plugin config", + var plugin Plugin + if err := yaml.UnmarshalStrict(writer.Bytes(), &plugin); err != nil { + return Plugin{}, errors.NewError( + "failed to unmarshal plugin template to plugin", "ensure that the plugin template renders a valid pipeline", "plugin_type", pluginType, "rendered_config", writer.String(), @@ -56,10 +57,10 @@ func (r PluginRegistry) Render(pluginType string, params map[string]interface{}) ) } - for name, param := range config.Parameters { + for name, param := range plugin.Parameters { if err := param.validate(); err != nil { - return PluginConfig{}, errors.NewError( - "invalid parameter found in plugin config", + return Plugin{}, errors.NewError( + "invalid parameter found in plugin", "ensure that all parameters are valid for the plugin", "plugin_type", pluginType, "plugin_parameter", name, @@ -69,17 +70,17 @@ func (r PluginRegistry) Render(pluginType string, params map[string]interface{}) } } - return config, nil + return plugin, nil } // IsDefined returns a boolean indicating if a plugin is defined and registered. -func (r PluginRegistry) IsDefined(pluginType string) bool { +func (r Registry) IsDefined(pluginType string) bool { _, ok := r[pluginType] return ok } // LoadAll will load all plugin templates contained in a directory. -func (r PluginRegistry) LoadAll(dir string, pattern string) error { +func (r Registry) LoadAll(dir string, pattern string) error { glob := filepath.Join(dir, pattern) filePaths, err := filepath.Glob(glob) if err != nil { @@ -109,7 +110,7 @@ func (r PluginRegistry) LoadAll(dir string, pattern string) error { } // Load will load a plugin template from a file path. -func (r PluginRegistry) Load(path string) error { +func (r Registry) Load(path string) error { fileName := filepath.Base(path) pluginType := strings.TrimSuffix(fileName, filepath.Ext(fileName)) @@ -122,8 +123,8 @@ func (r PluginRegistry) Load(path string) error { } // Add will add a plugin to the registry. -func (r PluginRegistry) Add(pluginType string, contents string) error { - if IsDefined(pluginType) { +func (r Registry) Add(pluginType string, contents string) error { + if operator.IsDefined(pluginType) { return fmt.Errorf("plugin type %s already exists as a builtin plugin", pluginType) } @@ -137,8 +138,8 @@ func (r PluginRegistry) Add(pluginType string, contents string) error { } // NewPluginRegistry creates a new plugin registry from a plugin directory. -func NewPluginRegistry(dir string) (PluginRegistry, error) { - registry := PluginRegistry{} +func NewPluginRegistry(dir string) (Registry, error) { + registry := Registry{} if err := registry.LoadAll(dir, "*.yaml"); err != nil { return registry, err } diff --git a/operator/plugin_test.go b/plugin/plugin_test.go similarity index 90% rename from operator/plugin_test.go rename to plugin/plugin_test.go index 0c1e5ccd8..3cff0f42d 100644 --- a/operator/plugin_test.go +++ b/plugin/plugin_test.go @@ -1,4 +1,4 @@ -package operator +package plugin import ( "io/ioutil" @@ -7,6 +7,7 @@ import ( "testing" "text/template" + "github.com/observiq/stanza/operator" "github.com/stretchr/testify/require" ) @@ -23,12 +24,8 @@ func NewTempDir(t *testing.T) string { return tempDir } -func TestPluginRegistry_LoadAll(t *testing.T) { - tempDir, err := ioutil.TempDir("", "") - require.NoError(t, err) - t.Cleanup(func() { - os.RemoveAll(tempDir) - }) +func TestNewRegistry(t *testing.T) { + tempDir := NewTempDir(t) test1 := []byte(` id: my_generator @@ -46,21 +43,31 @@ record: message2: {{ .message }} `) - err = ioutil.WriteFile(filepath.Join(tempDir, "test1.yaml"), test1, 0666) + err := ioutil.WriteFile(filepath.Join(tempDir, "test1.yaml"), test1, 0666) require.NoError(t, err) err = ioutil.WriteFile(filepath.Join(tempDir, "test2.yaml"), test2, 0666) require.NoError(t, err) - pluginRegistry := PluginRegistry{} - err = pluginRegistry.LoadAll(tempDir, "*.yaml") + registry, err := NewPluginRegistry(tempDir) require.NoError(t, err) - require.Equal(t, 2, len(pluginRegistry)) + require.Equal(t, 2, len(registry)) + require.True(t, registry.IsDefined("test1")) + require.True(t, registry.IsDefined("test2")) } -func TestPluginRegistryRender(t *testing.T) { +func TestNewRegistryFailure(t *testing.T) { + tempDir := NewTempDir(t) + err := ioutil.WriteFile(filepath.Join(tempDir, "invalid.yaml"), []byte("pipeline:"), 0111) + require.NoError(t, err) + + _, err = NewPluginRegistry(tempDir) + require.Error(t, err) +} + +func TestRegistryRender(t *testing.T) { t.Run("ErrorTypeDoesNotExist", func(t *testing.T) { - reg := PluginRegistry{} + reg := Registry{} _, err := reg.Render("unknown", map[string]interface{}{}) require.Error(t, err) require.Contains(t, err.Error(), "does not exist") @@ -70,7 +77,7 @@ func TestPluginRegistryRender(t *testing.T) { tmpl, err := template.New("plugintype").Parse(`{{ .panicker }}`) require.NoError(t, err) - reg := PluginRegistry{ + reg := Registry{ "plugintype": tmpl, } params := map[string]interface{}{ @@ -83,24 +90,24 @@ func TestPluginRegistryRender(t *testing.T) { }) } -func TestPluginRegistryLoad(t *testing.T) { +func TestRegistryLoad(t *testing.T) { t.Run("LoadAllBadGlob", func(t *testing.T) { - reg := PluginRegistry{} + reg := Registry{} err := reg.LoadAll("", `[]`) require.Error(t, err) require.Contains(t, err.Error(), "with glob pattern") }) t.Run("AddDuplicate", func(t *testing.T) { - reg := PluginRegistry{} - Register("copy", func() Builder { return nil }) + reg := Registry{} + operator.Register("copy", func() operator.Builder { return nil }) err := reg.Add("copy", "pipeline:\n") require.Error(t, err) require.Contains(t, err.Error(), "already exists") }) t.Run("AddBadTemplate", func(t *testing.T) { - reg := PluginRegistry{} + reg := Registry{} err := reg.Add("new", "{{ nofunc }") require.Error(t, err) require.Contains(t, err.Error(), "as a plugin template") @@ -112,7 +119,7 @@ func TestPluginRegistryLoad(t *testing.T) { err := ioutil.WriteFile(pluginPath, []byte("pipeline:\n"), 0755) require.NoError(t, err) - reg := PluginRegistry{} + reg := Registry{} err = reg.LoadAll(tempDir, "*.yaml") require.Error(t, err) }) @@ -614,7 +621,7 @@ pipeline: for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - reg := PluginRegistry{} + reg := Registry{} err := reg.Add(tc.name, tc.template) require.NoError(t, err) _, err = reg.Render(tc.name, map[string]interface{}{}) @@ -626,3 +633,13 @@ pipeline: }) } } + +func TestDefaultPluginFuncWithValue(t *testing.T) { + result := defaultPluginFunc("default_value", "supplied_value") + require.Equal(t, "supplied_value", result) +} + +func TestDefaultPluginFuncWithoutValue(t *testing.T) { + result := defaultPluginFunc("default_value", nil) + require.Equal(t, "default_value", result) +} diff --git a/testutil/database.go b/testutil/database.go new file mode 100644 index 000000000..afb0b2507 --- /dev/null +++ b/testutil/database.go @@ -0,0 +1,70 @@ +// Code generated by mockery v2.2.1. DO NOT EDIT. + +package testutil + +import ( + bbolt "go.etcd.io/bbolt" + + mock "github.com/stretchr/testify/mock" +) + +// Database is an autogenerated mock type for the Database type +type Database struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *Database) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Sync provides a mock function with given fields: +func (_m *Database) Sync() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Update provides a mock function with given fields: _a0 +func (_m *Database) Update(_a0 func(*bbolt.Tx) error) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(func(*bbolt.Tx) error) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// View provides a mock function with given fields: _a0 +func (_m *Database) View(_a0 func(*bbolt.Tx) error) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(func(*bbolt.Tx) error) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/testutil/operator.go b/testutil/operator.go index 4c2f490de..4f409d1fb 100644 --- a/testutil/operator.go +++ b/testutil/operator.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.2.1. DO NOT EDIT. package testutil diff --git a/testutil/operator_builder.go b/testutil/operator_builder.go new file mode 100644 index 000000000..0ca10b785 --- /dev/null +++ b/testutil/operator_builder.go @@ -0,0 +1,76 @@ +// Code generated by mockery v2.2.1. DO NOT EDIT. + +package testutil + +import ( + operator "github.com/observiq/stanza/operator" + mock "github.com/stretchr/testify/mock" +) + +// OperatorBuilder is an autogenerated mock type for the Builder type +type OperatorBuilder struct { + mock.Mock +} + +// Build provides a mock function with given fields: _a0 +func (_m *OperatorBuilder) Build(_a0 operator.BuildContext) (operator.Operator, error) { + ret := _m.Called(_a0) + + var r0 operator.Operator + if rf, ok := ret.Get(0).(func(operator.BuildContext) operator.Operator); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(operator.Operator) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(operator.BuildContext) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ID provides a mock function with given fields: +func (_m *OperatorBuilder) ID() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// SetNamespace provides a mock function with given fields: namespace, exclude +func (_m *OperatorBuilder) SetNamespace(namespace string, exclude ...string) { + _va := make([]interface{}, len(exclude)) + for _i := range exclude { + _va[_i] = exclude[_i] + } + var _ca []interface{} + _ca = append(_ca, namespace) + _ca = append(_ca, _va...) + _m.Called(_ca...) +} + +// Type provides a mock function with given fields: +func (_m *OperatorBuilder) Type() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} diff --git a/testutil/pipeline.go b/testutil/pipeline.go new file mode 100644 index 000000000..c147bb215 --- /dev/null +++ b/testutil/pipeline.go @@ -0,0 +1,75 @@ +// Code generated by mockery v2.2.1. DO NOT EDIT. + +package testutil + +import mock "github.com/stretchr/testify/mock" + +// Pipeline is an autogenerated mock type for the Pipeline type +type Pipeline struct { + mock.Mock +} + +// Render provides a mock function with given fields: +func (_m *Pipeline) Render() ([]byte, error) { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Running provides a mock function with given fields: +func (_m *Pipeline) Running() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Start provides a mock function with given fields: +func (_m *Pipeline) Start() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Stop provides a mock function with given fields: +func (_m *Pipeline) Stop() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/testutil/util.go b/testutil/util.go index fdabbbf5a..2754e14bd 100644 --- a/testutil/util.go +++ b/testutil/util.go @@ -55,9 +55,8 @@ func NewTestDatabase(t testing.TB) *bbolt.DB { // NewBuildContext will return a new build context for testing func NewBuildContext(t testing.TB) operator.BuildContext { return operator.BuildContext{ - PluginRegistry: make(operator.PluginRegistry), - Database: NewTestDatabase(t), - Logger: zaptest.NewLogger(t).Sugar(), + Database: NewTestDatabase(t), + Logger: zaptest.NewLogger(t).Sugar(), } }