Skip to content

Commit

Permalink
Shim refactor to support processors and output
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka committed Jul 2, 2020
1 parent decd656 commit df26b03
Show file tree
Hide file tree
Showing 19 changed files with 1,032 additions and 85 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
- [#7726](https://github.com/influxdata/telegraf/pull/7726): Add laundry to mem plugin on FreeBSD.
- [#7762](https://github.com/influxdata/telegraf/pull/7762): Allow per input overriding of collection_jitter and precision.
- [#7686](https://github.com/influxdata/telegraf/pull/7686): Improve performance of procstat: Up to 40/120x better performance.
- [#7677](https://github.com/influxdata/telegraf/pull/7677): Expand execd shim support for processor and outputs.

#### Bugfixes

Expand Down
63 changes: 63 additions & 0 deletions plugins/common/shim/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Telegraf Execd Go Shim

The goal of this _shim_ is to make it trivial to extract an internal input,
processor, or output plugin from the main Telegraf repo out to a stand-alone
repo. This allows anyone to build and run it as a separate app using one of the
execd plugins:
- [inputs.execd](/plugins/inputs/execd)
- [processors.execd](/plugins/processors/execd)
- [outputs.execd](/plugins/outputs/execd)

## Steps to externalize a plugin

1. Move the project to an external repo, it's recommended to preserve the path
structure, (but not strictly necessary). eg if your plugin was at
`plugins/inputs/cpu`, it's recommended that it also be under `plugins/inputs/cpu`
in the new repo. For a further example of what this might look like, take a
look at [ssoroka/rand](https://github.com/ssoroka/rand) or
[danielnelson/telegraf-plugins](https://github.com/danielnelson/telegraf-plugins)
1. Copy [main.go](./example/cmd/main.go) into your project under the `cmd` folder.
This will be the entrypoint to the plugin when run as a stand-alone program, and
it will call the shim code for you to make that happen. It's recommended to
have only one plugin per repo, as the shim is not designed to run multiple
plugins at the same time (it would vastly complicate things).
1. Edit the main.go file to import your plugin. Within Telegraf this would have
been done in an all.go file, but here we don't split the two apart, and the change
just goes in the top of main.go. If you skip this step, your plugin will do nothing.
eg: `_ "github.com/me/my-plugin-telegraf/plugins/inputs/cpu"`
1. Optionally add a [plugin.conf](./example/cmd/plugin.conf) for configuration
specific to your plugin. Note that this config file **must be separate from the
rest of the config for Telegraf, and must not be in a shared directory where
Telegraf is expecting to load all configs**. If Telegraf reads this config file
it will not know which plugin it relates to. Telegraf instead uses an execd config
block to look for this plugin.

## Steps to build and run your plugin

1. Build the cmd/main.go. For my rand project this looks like `go build -o rand cmd/main.go`
1. If you're building an input, you can test out the binary just by running it.
eg `./rand -config plugin.conf`
Depending on your polling settings and whether you implemented a service plugin or
an input gathering plugin, you may see data right away, or you may have to hit enter
first, or wait for your poll duration to elapse, but the metrics will be written to
STDOUT. Ctrl-C to end your test.
If you're testig a processor or output manually, you can still do this but you
will need to feed valid metrics in on STDIN to verify that it is doing what you
want. This can be a very valuable debugging technique before hooking it up to
Telegraf.
1. Configure Telegraf to call your new plugin binary. For an input, this would
look something like:

```
[[inputs.execd]]
command = ["/path/to/rand", "-config", "/path/to/plugin.conf"]
signal = "none"
```

Refer to the execd plugin readmes for more information.

## Congratulations!

You've done it! Consider publishing your plugin to github and open a Pull Request
back to the Telegraf repo letting us know about the availability of your
[external plugin](https://github.com/influxdata/telegraf/blob/master/EXTERNAL_PLUGINS.md).
163 changes: 163 additions & 0 deletions plugins/common/shim/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package shim

import (
"errors"
"fmt"
"io/ioutil"
"os"

"github.com/BurntSushi/toml"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/processors"
)

type config struct {
Inputs map[string][]toml.Primitive
Processors map[string][]toml.Primitive
Outputs map[string][]toml.Primitive
}

type loadedConfig struct {
Input telegraf.Input
Processor telegraf.StreamingProcessor
Output telegraf.Output
}

// LoadConfig Adds plugins to the shim
func (s *Shim) LoadConfig(filePath *string) error {
conf, err := LoadConfig(filePath)
if err != nil {
return err
}
if conf.Input != nil {
if err = s.AddInput(conf.Input); err != nil {
return fmt.Errorf("Failed to add Input: %w", err)
}
} else if conf.Processor != nil {
if err = s.AddStreamingProcessor(conf.Processor); err != nil {
return fmt.Errorf("Failed to add Processor: %w", err)
}
} else if conf.Output != nil {
if err = s.AddOutput(conf.Output); err != nil {
return fmt.Errorf("Failed to add Output: %w", err)
}
}
return nil
}

// LoadConfig loads the config and returns inputs that later need to be loaded.
func LoadConfig(filePath *string) (loaded loadedConfig, err error) {
var data string
conf := config{}
if filePath != nil && *filePath != "" {

b, err := ioutil.ReadFile(*filePath)
if err != nil {
return loadedConfig{}, err
}

data = expandEnvVars(b)

} else {
conf, err = DefaultImportedPlugins()
if err != nil {
return loadedConfig{}, err
}
}

md, err := toml.Decode(data, &conf)
if err != nil {
return loadedConfig{}, err
}

return createPluginsWithTomlConfig(md, conf)
}

func expandEnvVars(contents []byte) string {
return os.Expand(string(contents), getEnv)
}

func getEnv(key string) string {
v := os.Getenv(key)

return envVarEscaper.Replace(v)
}

func createPluginsWithTomlConfig(md toml.MetaData, conf config) (loadedConfig, error) {
loadedConf := loadedConfig{}

for name, primitives := range conf.Inputs {
creator, ok := inputs.Inputs[name]
if !ok {
return loadedConf, errors.New("unknown input " + name)
}

plugin := creator()
if len(primitives) > 0 {
primitive := primitives[0]
if err := md.PrimitiveDecode(primitive, plugin); err != nil {
return loadedConf, err
}
}

loadedConf.Input = plugin
break
}

for name, primitives := range conf.Processors {
creator, ok := processors.Processors[name]
if !ok {
return loadedConf, errors.New("unknown processor " + name)
}

plugin := creator()
if len(primitives) > 0 {
primitive := primitives[0]
if err := md.PrimitiveDecode(primitive, plugin); err != nil {
return loadedConf, err
}
}
loadedConf.Processor = plugin
break
}

for name, primitives := range conf.Outputs {
creator, ok := outputs.Outputs[name]
if !ok {
return loadedConf, errors.New("unknown output " + name)
}

plugin := creator()
if len(primitives) > 0 {
primitive := primitives[0]
if err := md.PrimitiveDecode(primitive, plugin); err != nil {
return loadedConf, err
}
}
loadedConf.Output = plugin
break
}
return loadedConf, nil
}

// DefaultImportedPlugins defaults to whatever plugins happen to be loaded and
// have registered themselves with the registry. This makes loading plugins
// without having to define a config dead easy.
func DefaultImportedPlugins() (config, error) {
conf := config{}
for name := range inputs.Inputs {
conf.Inputs[name] = []toml.Primitive{}
return conf, nil
}
for name := range processors.Processors {
conf.Processors[name] = []toml.Primitive{}
return conf, nil
}
for name := range outputs.Outputs {
conf.Outputs[name] = []toml.Primitive{}
return conf, nil
}
return conf, nil
}
29 changes: 29 additions & 0 deletions plugins/common/shim/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package shim

import (
"os"
"testing"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/stretchr/testify/require"
)

func TestLoadConfig(t *testing.T) {
os.Setenv("SECRET_TOKEN", "xxxxxxxxxx")
os.Setenv("SECRET_VALUE", `test"\test`)

inputs.Add("test", func() telegraf.Input {
return &serviceInput{}
})

c := "./testdata/plugin.conf"
conf, err := LoadConfig(&c)
require.NoError(t, err)

inp := conf.Input.(*serviceInput)

require.Equal(t, "awesome name", inp.ServiceName)
require.Equal(t, "xxxxxxxxxx", inp.SecretToken)
require.Equal(t, `test"\test`, inp.SecretValue)
}
60 changes: 60 additions & 0 deletions plugins/common/shim/example/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"flag"
"fmt"
"os"
"time"

// TODO: import your plugins
// _ "github.com/my_github_user/my_plugin_repo/plugins/inputs/mypluginname"

"github.com/influxdata/telegraf/plugins/common/shim"
)

var pollInterval = flag.Duration("poll_interval", 1*time.Second, "how often to send metrics")
var pollIntervalDisabled = flag.Bool("poll_interval_disabled", false, "how often to send metrics")
var configFile = flag.String("config", "", "path to the config file for this plugin")
var err error

// This is designed to be simple; Just change the import above and you're good.
//
// However, if you want to do all your config in code, you can like so:
//
// // initialize your plugin with any settngs you want
// myInput := &mypluginname.MyPlugin{
// DefaultSettingHere: 3,
// }
//
// shim := shim.New()
//
// shim.AddInput(myInput)
//
// // now the shim.Run() call as below.
//
func main() {
// parse command line options
flag.Parse()
if *pollIntervalDisabled {
*pollInterval = shim.PollIntervalDisabled
}

// create the shim. This is what will run your plugins.
shim := shim.New()

// If no config is specified, all imported plugins are loaded.
// otherwise follow what the config asks for.
// Check for settings from a config toml file,
// (or just use whatever plugins were imported above)
err = shim.LoadConfig(configFile)
if err != nil {
fmt.Fprintf(os.Stderr, "Err loading input: %s\n", err)
os.Exit(1)
}

// run the input plugin(s) until stdin closes or we receive a termination signal
if err := shim.Run(*pollInterval); err != nil {
fmt.Fprintf(os.Stderr, "Err: %s\n", err)
os.Exit(1)
}
}
2 changes: 2 additions & 0 deletions plugins/common/shim/example/cmd/plugin.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[[inputs.my_plugin_name]]
value_name = "value"
Loading

0 comments on commit df26b03

Please sign in to comment.