Skip to content

Commit

Permalink
Operator refactor (#53)
Browse files Browse the repository at this point in the history
* Renamed plugins to operators and custom to plugins
  • Loading branch information
djaglowski authored Jul 22, 2020
1 parent 2d51e89 commit b1e749e
Show file tree
Hide file tree
Showing 113 changed files with 2,344 additions and 2,583 deletions.
6 changes: 2 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ jobs:
command: make test

test-windows:
executor:
name: win/default
size: large
executor: win/default

steps:
- checkout
Expand Down Expand Up @@ -177,7 +175,7 @@ jobs:
command: mkdir {bin,out,tmp}
- run:
name: Build Carbon Agent
command: go get -v -t -d ./... && go build -v -o ./bin/carbon ./
command: GOPROXY=direct go build -v -o ./bin/carbon ./
- run:
name: Build Log Bench
command: GOPROXY=direct go get github.com/observiq/amazon-log-agent-benchmark-tool/cmd/logbench/ &&
Expand Down
38 changes: 19 additions & 19 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,57 +68,57 @@ A PR is considered to be **ready to merge** when:

## Design Choices

Best practices for developing a builtin plugin are documented below, but for changes to
Best practices for developing a builtin operator are documented below, but for changes to
the core agent, we are happy to discuss proposals in the issue tracker.

### Builtin Plugin Development
### Builtin Operator Development

In order to write a builtin plugin, follow these three steps:
1. Build a unique struct that satisfies the [`Plugin`](plugin/plugin.go) interface. This struct will define what your plugin does when executed in the pipeline.
In order to write a builtin operator, follow these three steps:
1. Build a unique struct that satisfies the [`Operator`](operator/operator.go) interface. This struct will define what your operator does when executed in the pipeline.

```go
type ExamplePlugin struct {
type ExampleOperator struct {
FilePath string
}

func (p *ExamplePlugin) Process(ctx context.Context, entry *entry.Entry) error {
func (p *ExampleOperator) Process(ctx context.Context, entry *entry.Entry) error {
// Processing logic
}
```

2. Build a unique config struct that satisfies the [`Config`](plugin/config.go) interface. This struct will define the parameters used to configure and build your plugin struct in step 1.
2. Build a unique config struct that satisfies the [`Config`](operator/config.go) interface. This struct will define the parameters used to configure and build your operator struct in step 1.

```go
type ExamplePluginConfig struct {
type ExampleOperatorConfig struct {
filePath string
}

func (c ExamplePluginConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) {
return &ExamplePlugin{
func (c ExampleOperatorConfig) Build(context operator.BuildContext) (operator.Operator, error) {
return &ExampleOperator{
filePath: c.FilePath,
}, nil
}
```

3. Register your config struct in the plugin registry using an `init()` hook. This will ensure that the agent knows about your plugin at runtime and can build it from a YAML config.
3. Register your config struct in the operator registry using an `init()` hook. This will ensure that the agent knows about your operator at runtime and can build it from a YAML config.

```go
func init() {
plugin.Register("example_plugin", &ExamplePluginConfig{})
operator.Register("example_operator", &ExampleOperatorConfig{})
}
```

## Any tips for building plugins?
We highly recommend that developers take advantage of [helpers](plugin/helper) when building their plugins. Helpers are structs that help satisfy common behavior shared across many plugins. By embedding these structs, you can skip having to satisfy certain aspects of the `plugin` and `config` interfaces.
## Any tips for building operators?
We highly recommend that developers take advantage of [helpers](operator/helper) when building their operators. Helpers are structs that help satisfy common behavior shared across many operators. By embedding these structs, you can skip having to satisfy certain aspects of the `operator` and `config` interfaces.

For example, almost all plugins should embed the [BasicPlugin](plugin/helper/basic_plugin.go) helper, as it provides simple functionality for returning a plugin id and plugin type.
For example, almost all operators should embed the [BasicOperator](operator/helper/basic_operator.go) helper, as it provides simple functionality for returning an operator id and operator type.

```go
// ExamplePlugin is a basic plugin, with a basic lifecycle, that consumes
// but doesn't send log entries. Rather than implementing every part of the plugin
// ExampleOperator is a basic operator, with a basic lifecycle, that consumes
// but doesn't send log entries. Rather than implementing every part of the operator
// interface, we can embed the following helpers to achieve this effect.
type ExamplePlugin struct {
helper.BasicPlugin
type ExampleOperator struct {
helper.BasicOperator
helper.BasicLifecycle
helper.BasicOutput
}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ carbon

# Supported flags:
--config The location of the agent config file (default: ./config.yaml)
--plugin_dir The location of the custom plugins directory (default: ./plugins)
--plugin_dir The location of the plugins directory (default: ./plugins)
--database The location of the offsets database file. If this is not specified, offsets will not be maintained across agent restarts
--log_file The location of the agent log file. If not specified, carbon will log to `stderr`
--debug Enables debug logging
Expand Down
22 changes: 11 additions & 11 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"time"

"github.com/observiq/carbon/errors"
"github.com/observiq/carbon/operator"
_ "github.com/observiq/carbon/operator/builtin" // register operators
"github.com/observiq/carbon/pipeline"
pg "github.com/observiq/carbon/plugin"
_ "github.com/observiq/carbon/plugin/builtin" // register plugins
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
Expand All @@ -21,7 +21,7 @@ type LogAgent struct {
Database string
*zap.SugaredLogger

database pg.Database
database operator.Database
pipeline *pipeline.Pipeline
running bool
}
Expand All @@ -39,13 +39,13 @@ func (a *LogAgent) Start() error {
}
a.database = database

registry, err := pg.NewCustomRegistry(a.PluginDir)
registry, err := operator.NewPluginRegistry(a.PluginDir)
if err != nil {
a.Errorw("Failed to load custom plugin registry", zap.Any("error", err))
a.Errorw("Failed to load plugin registry", zap.Any("error", err))
}

buildContext := pg.BuildContext{
CustomRegistry: registry,
buildContext := operator.BuildContext{
PluginRegistry: registry,
Logger: a.SugaredLogger,
Database: a.database,
}
Expand Down Expand Up @@ -83,9 +83,9 @@ func (a *LogAgent) Stop() {
}

// OpenDatabase will open and create a database.
func OpenDatabase(file string) (pg.Database, error) {
func OpenDatabase(file string) (operator.Database, error) {
if file == "" {
return pg.NewStubDatabase(), nil
return operator.NewStubDatabase(), nil
}

if _, err := os.Stat(filepath.Dir(file)); err != nil {
Expand All @@ -104,11 +104,11 @@ func OpenDatabase(file string) (pg.Database, error) {
}

// NewLogAgent creates a new carbon log agent.
func NewLogAgent(cfg *Config, logger *zap.SugaredLogger, pluginDir, databaseFile string) *LogAgent {
func NewLogAgent(cfg *Config, logger *zap.SugaredLogger, operatorDir, databaseFile string) *LogAgent {
return &LogAgent{
Config: cfg,
SugaredLogger: logger,
PluginDir: pluginDir,
PluginDir: operatorDir,
Database: databaseFile,
}
}
2 changes: 1 addition & 1 deletion commands/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"testing"
"time"

"github.com/observiq/carbon/plugin/builtin/output"
"github.com/observiq/carbon/operator/builtin/output"
"github.com/stretchr/testify/require"
)

Expand Down
14 changes: 7 additions & 7 deletions commands/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"os"

"github.com/observiq/carbon/agent"
"github.com/observiq/carbon/plugin"
pg "github.com/observiq/carbon/plugin"
"github.com/observiq/carbon/operator"
pg "github.com/observiq/carbon/operator"
"github.com/spf13/cobra"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -21,7 +21,7 @@ func NewGraphCommand(rootFlags *RootFlags) *cobra.Command {
return &cobra.Command{
Use: "graph",
Args: cobra.NoArgs,
Short: "Export a dot-formatted representation of the plugin graph",
Short: "Export a dot-formatted representation of the operator graph",
Run: func(command *cobra.Command, args []string) { runGraph(command, args, rootFlags) },
}
}
Expand All @@ -43,19 +43,19 @@ func runGraph(_ *cobra.Command, _ []string, flags *RootFlags) {
os.Exit(1)
}

customRegistry, err := plugin.NewCustomRegistry(flags.PluginDir)
pluginRegistry, err := operator.NewPluginRegistry(flags.PluginDir)
if err != nil {
logger.Errorw("Failed to load custom plugin registry", zap.Any("error", err))
logger.Errorw("Failed to load plugin registry", zap.Any("error", err))
}

buildContext := pg.BuildContext{
CustomRegistry: customRegistry,
PluginRegistry: pluginRegistry,
Logger: logger,
}

pipeline, err := cfg.Pipeline.BuildPipeline(buildContext)
if err != nil {
logger.Errorw("Failed to build plugin pipeline", zap.Any("error", err))
logger.Errorw("Failed to build operator pipeline", zap.Any("error", err))
os.Exit(1)
}

Expand Down
16 changes: 8 additions & 8 deletions commands/offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"

agent "github.com/observiq/carbon/agent"
"github.com/observiq/carbon/plugin/helper"
"github.com/observiq/carbon/operator/helper"
"github.com/spf13/cobra"
"go.etcd.io/bbolt"
)
Expand All @@ -17,7 +17,7 @@ var stdout io.Writer = os.Stdout
func NewOffsetsCmd(rootFlags *RootFlags) *cobra.Command {
offsets := &cobra.Command{
Use: "offsets",
Short: "Manage input plugin offsets",
Short: "Manage input operator offsets",
Args: cobra.NoArgs,
Run: func(command *cobra.Command, args []string) {
stdout.Write([]byte("No offsets subcommand specified. See `carbon offsets help` for details\n"))
Expand All @@ -35,7 +35,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command {
var all bool

offsetsClear := &cobra.Command{
Use: "clear [flags] [plugin_ids]",
Use: "clear [flags] [operator_ids]",
Short: "Clear persisted offsets from the database",
Args: cobra.ArbitraryArgs,
Run: func(command *cobra.Command, args []string) {
Expand All @@ -46,7 +46,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command {

if all {
if len(args) != 0 {
stdout.Write([]byte("Providing a list of plugin IDs does nothing with the --all flag\n"))
stdout.Write([]byte("Providing a list of operator IDs does nothing with the --all flag\n"))
}

err := db.Update(func(tx *bbolt.Tx) error {
Expand All @@ -59,18 +59,18 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command {
exitOnErr("Failed to delete offsets", err)
} else {
if len(args) == 0 {
stdout.Write([]byte("Must either specify a list of plugins or the --all flag\n"))
stdout.Write([]byte("Must either specify a list of operators or the --all flag\n"))
os.Exit(1)
}

for _, pluginID := range args {
for _, operatorID := range args {
err = db.Update(func(tx *bbolt.Tx) error {
offsetBucket := tx.Bucket(helper.OffsetsBucket)
if offsetBucket == nil {
return nil
}

return offsetBucket.DeleteBucket([]byte(pluginID))
return offsetBucket.DeleteBucket([]byte(operatorID))
})
exitOnErr("Failed to delete offsets", err)
}
Expand All @@ -87,7 +87,7 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command {
func NewOffsetsListCmd(rootFlags *RootFlags) *cobra.Command {
offsetsList := &cobra.Command{
Use: "list",
Short: "List plugins with persisted offsets",
Short: "List operators with persisted offsets",
Args: cobra.NoArgs,
Run: func(command *cobra.Command, args []string) {
db, err := agent.OpenDatabase(rootFlags.DatabaseFile)
Expand Down
16 changes: 8 additions & 8 deletions commands/offsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"testing"

agent "github.com/observiq/carbon/agent"
"github.com/observiq/carbon/plugin/helper"
"github.com/observiq/carbon/operator/helper"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
Expand All @@ -33,15 +33,15 @@ func TestOffsets(t *testing.T) {
bucket, err := tx.CreateBucketIfNotExists(helper.OffsetsBucket)
require.NoError(t, err)

_, err = bucket.CreateBucket([]byte("$.testpluginid1"))
_, err = bucket.CreateBucket([]byte("$.testoperatorid1"))
require.NoError(t, err)
_, err = bucket.CreateBucket([]byte("$.testpluginid2"))
_, err = bucket.CreateBucket([]byte("$.testoperatorid2"))
require.NoError(t, err)
return nil
})
db.Close()

// check that offsets list actually lists the plugin
// check that offsets list actually lists the operator
offsetsList := NewRootCmd()
offsetsList.SetArgs([]string{
"offsets", "list",
Expand All @@ -51,24 +51,24 @@ func TestOffsets(t *testing.T) {

err = offsetsList.Execute()
require.NoError(t, err)
require.Equal(t, "$.testpluginid1\n$.testpluginid2\n", buf.String())
require.Equal(t, "$.testoperatorid1\n$.testoperatorid2\n", buf.String())

// clear the offsets
offsetsClear := NewRootCmd()
offsetsClear.SetArgs([]string{
"offsets", "clear",
"--database", databasePath,
"--config", configPath,
"$.testpluginid2",
"$.testoperatorid2",
})

err = offsetsClear.Execute()
require.NoError(t, err)

// Check that offsets list only shows uncleared plugin id
// Check that offsets list only shows uncleared operator id
buf.Reset()
err = offsetsList.Execute()
require.NoError(t, err)
require.Equal(t, "$.testpluginid1\n", buf.String())
require.Equal(t, "$.testoperatorid1\n", buf.String())

}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/observiq/ctimefmt v1.0.0
github.com/pkg/errors v0.9.1 // indirect
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.5.1
go.etcd.io/bbolt v1.3.4
go.uber.org/zap v1.15.0
Expand All @@ -34,7 +33,7 @@ require (
google.golang.org/grpc v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.3.0
honnef.co/go/tools v0.0.1-2020.1.3
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
k8s.io/apimachinery v0.18.4
k8s.io/client-go v0.18.4
)
Loading

0 comments on commit b1e749e

Please sign in to comment.