Skip to content

Commit

Permalink
Default output (#101)
Browse files Browse the repository at this point in the history
* Add default output capability

Co-authored-by: Joshua Williams <[email protected]>
Co-authored-by: Camden Cheek <[email protected]>
  • Loading branch information
3 people authored Aug 25, 2020
1 parent 445b939 commit 57466e2
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 27 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ dev/*
.DS_Store
tmp/*
coverage.txt
coverage.html
artifacts/*
.vscode/*
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ install-tools:
.PHONY: test
test:
go test -race -coverprofile coverage.txt -coverpkg ./... ./...
go tool cover -html=coverage.txt -o coverage.html

.PHONY: bench
bench:
Expand Down
44 changes: 36 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,33 +68,61 @@ func OpenDatabase(file string) (operator.Database, error) {
return bbolt.Open(file, 0666, options)
}

// NewLogAgent creates a new stanza log agent.
func NewLogAgent(cfg *Config, logger *zap.SugaredLogger, pluginDir, databaseFile string, buildParams map[string]interface{}) (*LogAgent, error) {
database, err := OpenDatabase(databaseFile)
type LogAgentBuilder struct {
cfg *Config
logger *zap.SugaredLogger
pluginDir string
databaseFile string
defaultOutput operator.Operator
}

func NewBuilder(cfg *Config, logger *zap.SugaredLogger) *LogAgentBuilder {
return &LogAgentBuilder{
cfg: cfg,
logger: logger,
}
}

func (b *LogAgentBuilder) WithPluginDir(pluginDir string) *LogAgentBuilder {
b.pluginDir = pluginDir
return b
}

func (b *LogAgentBuilder) WithDatabaseFile(databaseFile string) *LogAgentBuilder {
b.databaseFile = databaseFile
return b
}

func (b *LogAgentBuilder) WithDefaultOutput(defaultOutput operator.Operator) *LogAgentBuilder {
b.defaultOutput = defaultOutput
return b
}

func (b *LogAgentBuilder) Build() (*LogAgent, error) {
database, err := OpenDatabase(b.databaseFile)
if err != nil {
return nil, errors.Wrap(err, "open database")
}

registry, err := operator.NewPluginRegistry(pluginDir)
registry, err := operator.NewPluginRegistry(b.pluginDir)
if err != nil {
return nil, errors.Wrap(err, "load plugin registry")
}

buildContext := operator.BuildContext{
Logger: b.logger,
PluginRegistry: registry,
Logger: logger,
Database: database,
Parameters: buildParams,
}

pipeline, err := cfg.Pipeline.BuildPipeline(buildContext)
pipeline, err := b.cfg.Pipeline.BuildPipeline(buildContext, b.defaultOutput)
if err != nil {
return nil, err
}

return &LogAgent{
pipeline: pipeline,
database: database,
SugaredLogger: logger,
SugaredLogger: b.logger,
}, nil
}
5 changes: 4 additions & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ func TestNewLogAgent(t *testing.T) {
mockLogger := zap.NewNop().Sugar()
mockPluginDir := "/some/path/plugins"
mockDatabaseFile := ""
agent, err := NewLogAgent(&mockCfg, mockLogger, mockPluginDir, mockDatabaseFile, nil)
agent, err := NewBuilder(&mockCfg, mockLogger).
WithPluginDir(mockPluginDir).
WithDatabaseFile(mockDatabaseFile).
Build()
require.NoError(t, err)

require.Equal(t, mockLogger, agent.SugaredLogger)
Expand Down
2 changes: 1 addition & 1 deletion commands/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func runGraph(_ *cobra.Command, _ []string, flags *RootFlags) {
Logger: logger,
}

pipeline, err := cfg.Pipeline.BuildPipeline(buildContext)
pipeline, err := cfg.Pipeline.BuildPipeline(buildContext, nil)
if err != nil {
logger.Errorw("Failed to build operator pipeline", zap.Any("error", err))
os.Exit(1)
Expand Down
5 changes: 4 additions & 1 deletion commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ func runRoot(command *cobra.Command, _ []string, flags *RootFlags) {
}
logger.Debugw("Parsed config", "config", cfg)

agent, err := agent.NewLogAgent(cfg, logger, flags.PluginDir, flags.DatabaseFile, nil)
agent, err := agent.NewBuilder(cfg, logger).
WithPluginDir(flags.PluginDir).
WithDatabaseFile(flags.DatabaseFile).
Build()
if err != nil {
logger.Errorw("Failed to build agent", zap.Error(err))
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions docs/operators/json_parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The `json_parser` operator parses the string-type field selected by `parse_from`
| `preserve` | false | Preserve the unparsed value on the record |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator |
| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator |


### Example Configurations
Expand Down
1 change: 1 addition & 0 deletions docs/operators/regex_parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The `regex_parser` operator parses the string-type field selected by `parse_from
| `preserve` | false | Preserve the unparsed value on the record |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator |
| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator |

### Example Configurations

Expand Down
20 changes: 11 additions & 9 deletions docs/operators/syslog_parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ The `syslog_parser` operator parses the string-type field selected by `parse_fro

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `syslog_parser` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed as JSON |
| `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed as JSON |
| `preserve` | false | Preserve the unparsed value on the record |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
| `protocol` | required | The protocol to parse the syslog messages as. Options are `rfc3164` and `rfc5424` |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `syslog_parser` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed as JSON |
| `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed as JSON |
| `preserve` | false | Preserve the unparsed value on the record |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
| `protocol` | required | The protocol to parse the syslog messages as. Options are `rfc3164` and `rfc5424` |
| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator |
| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator |

### Example Configurations

Expand Down
6 changes: 5 additions & 1 deletion pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type Config []Params

// BuildPipeline will build a pipeline from the config.
func (c Config) BuildPipeline(context operator.BuildContext) (*Pipeline, error) {
func (c Config) BuildPipeline(context operator.BuildContext, defaultOutput operator.Operator) (*Pipeline, error) {
operatorConfigs, err := c.buildOperatorConfigs(context.PluginRegistry)
if err != nil {
return nil, err
Expand All @@ -25,6 +25,10 @@ func (c Config) BuildPipeline(context operator.BuildContext) (*Pipeline, error)
return nil, err
}

if defaultOutput != nil {
operators = append(operators, defaultOutput)
}

pipeline, err := NewPipeline(operators)
if err != nil {
return nil, err
Expand Down
93 changes: 88 additions & 5 deletions pipeline/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/observiq/stanza/operator"
_ "github.com/observiq/stanza/operator/builtin"
"github.com/observiq/stanza/operator/builtin/output"
"github.com/observiq/stanza/operator/builtin/transformer"
"github.com/observiq/stanza/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -275,10 +276,92 @@ pipeline:
},
}

_, err = pipelineConfig.BuildPipeline(context)
_, err = pipelineConfig.BuildPipeline(context, nil)
require.NoError(t, err)
}

func TestBuildValidPipelineDefaultOutput(t *testing.T) {
context := testutil.NewBuildContext(t)

pipelineConfig := Config{
Params{
"id": "generate_input",
"type": "generate_input",
"count": 1,
"entry": map[string]interface{}{
"record": map[string]interface{}{
"message": "test",
},
},
},
}

defaultOutput, err := output.NewDropOutputConfig("$.drop_it").Build(context)
require.NoError(t, err)

pl, err := pipelineConfig.BuildPipeline(context, defaultOutput)
require.NoError(t, err)
require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.generate_input"), createNodeID("$.drop_it")))
}

func TestBuildValidPipelineNextOutputAndDefaultOutput(t *testing.T) {
context := testutil.NewBuildContext(t)

pipelineConfig := Config{
Params{
"id": "generate_input",
"type": "generate_input",
"count": 1,
"entry": map[string]interface{}{
"record": map[string]interface{}{
"message": "test",
},
},
},
Params{
"id": "noop",
"type": "noop",
},
}

defaultOutput, err := output.NewDropOutputConfig("$.drop_it").Build(context)
require.NoError(t, err)

pl, err := pipelineConfig.BuildPipeline(context, defaultOutput)
require.NoError(t, err)
require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.generate_input"), createNodeID("$.noop")))
require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.noop"), createNodeID("$.drop_it")))
}

func TestBuildValidPluginDefaultOutput(t *testing.T) {
context := testutil.NewBuildContext(t)
pluginTemplate := `
pipeline:
- id: plugin_generate
type: generate_input
count: 1
entry:
record:
message: test
`
err := context.PluginRegistry.Add("plugin", pluginTemplate)
require.NoError(t, err)

pipelineConfig := Config{
Params{
"id": "plugin",
"type": "plugin",
},
}

defaultOutput, err := output.NewDropOutputConfig("$.drop_it").Build(context)
require.NoError(t, err)

pl, err := pipelineConfig.BuildPipeline(context, defaultOutput)
require.NoError(t, err)
require.True(t, pl.Graph.HasEdgeFromTo(createNodeID("$.plugin.plugin_generate"), createNodeID("$.drop_it")))
}

func TestBuildInvalidPipelineInvalidType(t *testing.T) {
context := testutil.NewBuildContext(t)

Expand All @@ -294,7 +377,7 @@ func TestBuildInvalidPipelineInvalidType(t *testing.T) {
},
}

_, err := pipelineConfig.BuildPipeline(context)
_, err := pipelineConfig.BuildPipeline(context, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "unsupported `type` for operator config")
}
Expand Down Expand Up @@ -325,7 +408,7 @@ pipeline:
},
}

_, err = pipelineConfig.BuildPipeline(context)
_, err = pipelineConfig.BuildPipeline(context, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "build operator configs")
}
Expand All @@ -344,7 +427,7 @@ func TestBuildInvalidPipelineInvalidOperator(t *testing.T) {
}

context := testutil.NewBuildContext(t)
_, err := pipelineConfig.BuildPipeline(context)
_, err := pipelineConfig.BuildPipeline(context, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "missing required parameter 'listen_address'")
}
Expand All @@ -369,7 +452,7 @@ func TestBuildInvalidPipelineInvalidGraph(t *testing.T) {
}

context := testutil.NewBuildContext(t)
_, err := pipelineConfig.BuildPipeline(context)
_, err := pipelineConfig.BuildPipeline(context, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "does not exist")
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestPipeline(t *testing.T) {

operator2 := testutil.NewMockOperator("operator2")
operator2.On("SetOutputs", mock.Anything).Return(nil)
operator2.On("Outputs").Return([]operator.Operator{operator1, operator1})
operator2.On("Outputs").Return([]operator.Operator{operator1, operator1}, nil)

node1 := createOperatorNode(operator1)
node2 := createOperatorNode(operator2)
Expand Down

0 comments on commit 57466e2

Please sign in to comment.