Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator refactor #53

Merged
merged 7 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ jobs:
command: make test

test-windows:
executor:
name: win/default
size: large
executor: win/default

steps:
- checkout
Expand Down Expand Up @@ -177,7 +175,7 @@ jobs:
command: mkdir {bin,out,tmp}
- run:
name: Build Carbon Agent
command: go get -v -t -d ./... && go build -v -o ./bin/carbon ./
command: GOPROXY=direct go build -v -o ./bin/carbon ./
- run:
name: Build Log Bench
command: GOPROXY=direct go get github.com/observiq/amazon-log-agent-benchmark-tool/cmd/logbench/ &&
Expand Down
38 changes: 19 additions & 19 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,57 +68,57 @@ A PR is considered to be **ready to merge** when:

## Design Choices

Best practices for developing a builtin plugin are documented below, but for changes to
Best practices for developing a builtin operator are documented below, but for changes to
the core agent, we are happy to discuss proposals in the issue tracker.

### Builtin Plugin Development
### Builtin Operator Development

In order to write a builtin plugin, follow these three steps:
1. Build a unique struct that satisfies the [`Plugin`](plugin/plugin.go) interface. This struct will define what your plugin does when executed in the pipeline.
In order to write a builtin operator, follow these three steps:
1. Build a unique struct that satisfies the [`Operator`](operator/operator.go) interface. This struct will define what your operator does when executed in the pipeline.

```go
type ExamplePlugin struct {
type ExampleOperator struct {
FilePath string
}

func (p *ExamplePlugin) Process(ctx context.Context, entry *entry.Entry) error {
func (p *ExampleOperator) Process(ctx context.Context, entry *entry.Entry) error {
// Processing logic
}
```

2. Build a unique config struct that satisfies the [`Config`](plugin/config.go) interface. This struct will define the parameters used to configure and build your plugin struct in step 1.
2. Build a unique config struct that satisfies the [`Config`](operator/config.go) interface. This struct will define the parameters used to configure and build your operator struct in step 1.

```go
type ExamplePluginConfig struct {
type ExampleOperatorConfig struct {
filePath string
}

func (c ExamplePluginConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) {
return &ExamplePlugin{
func (c ExampleOperatorConfig) Build(context operator.BuildContext) (operator.Operator, error) {
return &ExampleOperator{
filePath: c.FilePath,
}, nil
}
```

3. Register your config struct in the plugin registry using an `init()` hook. This will ensure that the agent knows about your plugin at runtime and can build it from a YAML config.
3. Register your config struct in the operator registry using an `init()` hook. This will ensure that the agent knows about your operator at runtime and can build it from a YAML config.

```go
func init() {
plugin.Register("example_plugin", &ExamplePluginConfig{})
operator.Register("example_operator", &ExampleOperatorConfig{})
}
```

## Any tips for building plugins?
We highly recommend that developers take advantage of [helpers](plugin/helper) when building their plugins. Helpers are structs that help satisfy common behavior shared across many plugins. By embedding these structs, you can skip having to satisfy certain aspects of the `plugin` and `config` interfaces.
## Any tips for building operators?
We highly recommend that developers take advantage of [helpers](operator/helper) when building their operators. Helpers are structs that help satisfy common behavior shared across many operators. By embedding these structs, you can skip having to satisfy certain aspects of the `operator` and `config` interfaces.

For example, almost all plugins should embed the [BasicPlugin](plugin/helper/basic_plugin.go) helper, as it provides simple functionality for returning a plugin id and plugin type.
For example, almost all operators should embed the [BasicOperator](operator/helper/basic_operator.go) helper, as it provides simple functionality for returning an operator id and operator type.

```go
// ExamplePlugin is a basic plugin, with a basic lifecycle, that consumes
// but doesn't send log entries. Rather than implementing every part of the plugin
// ExampleOperator is a basic operator, with a basic lifecycle, that consumes
// but doesn't send log entries. Rather than implementing every part of the operator
// interface, we can embed the following helpers to achieve this effect.
type ExamplePlugin struct {
helper.BasicPlugin
type ExampleOperator struct {
helper.BasicOperator
helper.BasicLifecycle
helper.BasicOutput
}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ carbon

# Supported flags:
--config The location of the agent config file (default: ./config.yaml)
--plugin_dir The location of the custom plugins directory (default: ./plugins)
--plugin_dir The location of the plugins directory (default: ./plugins)
--database The location of the offsets database file. If this is not specified, offsets will not be maintained across agent restarts
--log_file The location of the agent log file. If not specified, carbon will log to `stderr`
--debug Enables debug logging
Expand Down
22 changes: 11 additions & 11 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"time"

"github.com/observiq/carbon/errors"
"github.com/observiq/carbon/operator"
_ "github.com/observiq/carbon/operator/builtin" // register operators
"github.com/observiq/carbon/pipeline"
pg "github.com/observiq/carbon/plugin"
_ "github.com/observiq/carbon/plugin/builtin" // register plugins
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
Expand All @@ -21,7 +21,7 @@ type LogAgent struct {
Database string
*zap.SugaredLogger

database pg.Database
database operator.Database
pipeline *pipeline.Pipeline
running bool
}
Expand All @@ -39,13 +39,13 @@ func (a *LogAgent) Start() error {
}
a.database = database

registry, err := pg.NewCustomRegistry(a.PluginDir)
registry, err := operator.NewPluginRegistry(a.PluginDir)
if err != nil {
a.Errorw("Failed to load custom plugin registry", zap.Any("error", err))
a.Errorw("Failed to load plugin registry", zap.Any("error", err))
}

buildContext := pg.BuildContext{
CustomRegistry: registry,
buildContext := operator.BuildContext{
PluginRegistry: registry,
Logger: a.SugaredLogger,
Database: a.database,
}
Expand Down Expand Up @@ -83,9 +83,9 @@ func (a *LogAgent) Stop() {
}

// OpenDatabase will open and create a database.
func OpenDatabase(file string) (pg.Database, error) {
func OpenDatabase(file string) (operator.Database, error) {
if file == "" {
return pg.NewStubDatabase(), nil
return operator.NewStubDatabase(), nil
}

if _, err := os.Stat(filepath.Dir(file)); err != nil {
Expand All @@ -104,11 +104,11 @@ func OpenDatabase(file string) (pg.Database, error) {
}

// NewLogAgent creates a new carbon log agent.
func NewLogAgent(cfg *Config, logger *zap.SugaredLogger, pluginDir, databaseFile string) *LogAgent {
func NewLogAgent(cfg *Config, logger *zap.SugaredLogger, operatorDir, databaseFile string) *LogAgent {
return &LogAgent{
Config: cfg,
SugaredLogger: logger,
PluginDir: pluginDir,
PluginDir: operatorDir,
Database: databaseFile,
}
}
2 changes: 1 addition & 1 deletion commands/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"testing"
"time"

"github.com/observiq/carbon/plugin/builtin/output"
"github.com/observiq/carbon/operator/builtin/output"
"github.com/stretchr/testify/require"
)

Expand Down
14 changes: 7 additions & 7 deletions commands/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"os"

"github.com/observiq/carbon/agent"
"github.com/observiq/carbon/plugin"
pg "github.com/observiq/carbon/plugin"
"github.com/observiq/carbon/operator"
pg "github.com/observiq/carbon/operator"
"github.com/spf13/cobra"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -21,7 +21,7 @@ func NewGraphCommand(rootFlags *RootFlags) *cobra.Command {
return &cobra.Command{
Use: "graph",
Args: cobra.NoArgs,
Short: "Export a dot-formatted representation of the plugin graph",
Short: "Export a dot-formatted representation of the operator graph",
Run: func(command *cobra.Command, args []string) { runGraph(command, args, rootFlags) },
}
}
Expand All @@ -43,19 +43,19 @@ func runGraph(_ *cobra.Command, _ []string, flags *RootFlags) {
os.Exit(1)
}

customRegistry, err := plugin.NewCustomRegistry(flags.PluginDir)
pluginRegistry, err := operator.NewPluginRegistry(flags.PluginDir)
if err != nil {
logger.Errorw("Failed to load custom plugin registry", zap.Any("error", err))
logger.Errorw("Failed to load plugin registry", zap.Any("error", err))
}

buildContext := pg.BuildContext{
CustomRegistry: customRegistry,
PluginRegistry: pluginRegistry,
Logger: logger,
}

pipeline, err := cfg.Pipeline.BuildPipeline(buildContext)
if err != nil {
logger.Errorw("Failed to build plugin pipeline", zap.Any("error", err))
logger.Errorw("Failed to build operator pipeline", zap.Any("error", err))
os.Exit(1)
}

Expand Down
16 changes: 8 additions & 8 deletions commands/offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"

agent "github.com/observiq/carbon/agent"
"github.com/observiq/carbon/plugin/helper"
"github.com/observiq/carbon/operator/helper"
"github.com/spf13/cobra"
"go.etcd.io/bbolt"
)
Expand All @@ -17,7 +17,7 @@ var stdout io.Writer = os.Stdout
func NewOffsetsCmd(rootFlags *RootFlags) *cobra.Command {
offsets := &cobra.Command{
Use: "offsets",
Short: "Manage input plugin offsets",
Short: "Manage input operator offsets",
Args: cobra.NoArgs,
Run: func(command *cobra.Command, args []string) {
stdout.Write([]byte("No offsets subcommand specified. See `carbon offsets help` for details\n"))
Expand All @@ -35,7 +35,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command {
var all bool

offsetsClear := &cobra.Command{
Use: "clear [flags] [plugin_ids]",
Use: "clear [flags] [operator_ids]",
Short: "Clear persisted offsets from the database",
Args: cobra.ArbitraryArgs,
Run: func(command *cobra.Command, args []string) {
Expand All @@ -46,7 +46,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command {

if all {
if len(args) != 0 {
stdout.Write([]byte("Providing a list of plugin IDs does nothing with the --all flag\n"))
stdout.Write([]byte("Providing a list of operator IDs does nothing with the --all flag\n"))
}

err := db.Update(func(tx *bbolt.Tx) error {
Expand All @@ -59,18 +59,18 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command {
exitOnErr("Failed to delete offsets", err)
} else {
if len(args) == 0 {
stdout.Write([]byte("Must either specify a list of plugins or the --all flag\n"))
stdout.Write([]byte("Must either specify a list of operators or the --all flag\n"))
os.Exit(1)
}

for _, pluginID := range args {
for _, operatorID := range args {
err = db.Update(func(tx *bbolt.Tx) error {
offsetBucket := tx.Bucket(helper.OffsetsBucket)
if offsetBucket == nil {
return nil
}

return offsetBucket.DeleteBucket([]byte(pluginID))
return offsetBucket.DeleteBucket([]byte(operatorID))
})
exitOnErr("Failed to delete offsets", err)
}
Expand All @@ -87,7 +87,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command {
func NewOffsetsListCmd(rootFlags *RootFlags) *cobra.Command {
offsetsList := &cobra.Command{
Use: "list",
Short: "List plugins with persisted offsets",
Short: "List operators with persisted offsets",
Args: cobra.NoArgs,
Run: func(command *cobra.Command, args []string) {
db, err := agent.OpenDatabase(rootFlags.DatabaseFile)
Expand Down
16 changes: 8 additions & 8 deletions commands/offsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"testing"

agent "github.com/observiq/carbon/agent"
"github.com/observiq/carbon/plugin/helper"
"github.com/observiq/carbon/operator/helper"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
Expand All @@ -33,15 +33,15 @@ func TestOffsets(t *testing.T) {
bucket, err := tx.CreateBucketIfNotExists(helper.OffsetsBucket)
require.NoError(t, err)

_, err = bucket.CreateBucket([]byte("$.testpluginid1"))
_, err = bucket.CreateBucket([]byte("$.testoperatorid1"))
require.NoError(t, err)
_, err = bucket.CreateBucket([]byte("$.testpluginid2"))
_, err = bucket.CreateBucket([]byte("$.testoperatorid2"))
require.NoError(t, err)
return nil
})
db.Close()

// check that offsets list actually lists the plugin
// check that offsets list actually lists the operator
offsetsList := NewRootCmd()
offsetsList.SetArgs([]string{
"offsets", "list",
Expand All @@ -51,24 +51,24 @@ func TestOffsets(t *testing.T) {

err = offsetsList.Execute()
require.NoError(t, err)
require.Equal(t, "$.testpluginid1\n$.testpluginid2\n", buf.String())
require.Equal(t, "$.testoperatorid1\n$.testoperatorid2\n", buf.String())

// clear the offsets
offsetsClear := NewRootCmd()
offsetsClear.SetArgs([]string{
"offsets", "clear",
"--database", databasePath,
"--config", configPath,
"$.testpluginid2",
"$.testoperatorid2",
})

err = offsetsClear.Execute()
require.NoError(t, err)

// Check that offsets list only shows uncleared plugin id
// Check that offsets list only shows uncleared operator id
buf.Reset()
err = offsetsList.Execute()
require.NoError(t, err)
require.Equal(t, "$.testpluginid1\n", buf.String())
require.Equal(t, "$.testoperatorid1\n", buf.String())

}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/observiq/ctimefmt v1.0.0
github.com/pkg/errors v0.9.1 // indirect
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.5.1
go.etcd.io/bbolt v1.3.4
go.uber.org/zap v1.15.0
Expand All @@ -34,7 +33,7 @@ require (
google.golang.org/grpc v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.3.0
honnef.co/go/tools v0.0.1-2020.1.3
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
k8s.io/apimachinery v0.18.4
k8s.io/client-go v0.18.4
)
Loading