diff --git a/CHANGELOG.md b/CHANGELOG.md index 1154a8b102f83..708b915705668 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ - [#7726](https://github.com/influxdata/telegraf/pull/7726): Add laundry to mem plugin on FreeBSD. - [#7762](https://github.com/influxdata/telegraf/pull/7762): Allow per input overriding of collection_jitter and precision. - [#7686](https://github.com/influxdata/telegraf/pull/7686): Improve performance of procstat: Up to 40/120x better performance. +- [#7677](https://github.com/influxdata/telegraf/pull/7677): Expand execd shim support for processor and outputs. #### Bugfixes diff --git a/plugins/common/shim/README.md b/plugins/common/shim/README.md new file mode 100644 index 0000000000000..80235fb034056 --- /dev/null +++ b/plugins/common/shim/README.md @@ -0,0 +1,63 @@ +# Telegraf Execd Go Shim + +The goal of this _shim_ is to make it trivial to extract an internal input, +processor, or output plugin from the main Telegraf repo out to a stand-alone +repo. This allows anyone to build and run it as a separate app using one of the +execd plugins: +- [inputs.execd](/plugins/inputs/execd) +- [processors.execd](/plugins/processors/execd) +- [outputs.execd](/plugins/outputs/execd) + +## Steps to externalize a plugin + +1. Move the project to an external repo, it's recommended to preserve the path + structure, (but not strictly necessary). eg if your plugin was at + `plugins/inputs/cpu`, it's recommended that it also be under `plugins/inputs/cpu` + in the new repo. For a further example of what this might look like, take a + look at [ssoroka/rand](https://github.com/ssoroka/rand) or + [danielnelson/telegraf-plugins](https://github.com/danielnelson/telegraf-plugins) +1. Copy [main.go](./example/cmd/main.go) into your project under the `cmd` folder. + This will be the entrypoint to the plugin when run as a stand-alone program, and + it will call the shim code for you to make that happen. It's recommended to + have only one plugin per repo, as the shim is not designed to run multiple + plugins at the same time (it would vastly complicate things). +1. Edit the main.go file to import your plugin. Within Telegraf this would have + been done in an all.go file, but here we don't split the two apart, and the change + just goes in the top of main.go. If you skip this step, your plugin will do nothing. + eg: `_ "github.com/me/my-plugin-telegraf/plugins/inputs/cpu"` +1. Optionally add a [plugin.conf](./example/cmd/plugin.conf) for configuration + specific to your plugin. Note that this config file **must be separate from the + rest of the config for Telegraf, and must not be in a shared directory where + Telegraf is expecting to load all configs**. If Telegraf reads this config file + it will not know which plugin it relates to. Telegraf instead uses an execd config + block to look for this plugin. + +## Steps to build and run your plugin + +1. Build the cmd/main.go. For my rand project this looks like `go build -o rand cmd/main.go` +1. If you're building an input, you can test out the binary just by running it. + eg `./rand -config plugin.conf` + Depending on your polling settings and whether you implemented a service plugin or + an input gathering plugin, you may see data right away, or you may have to hit enter + first, or wait for your poll duration to elapse, but the metrics will be written to + STDOUT. Ctrl-C to end your test. + If you're testig a processor or output manually, you can still do this but you + will need to feed valid metrics in on STDIN to verify that it is doing what you + want. This can be a very valuable debugging technique before hooking it up to + Telegraf. +1. Configure Telegraf to call your new plugin binary. For an input, this would + look something like: + +``` +[[inputs.execd]] + command = ["/path/to/rand", "-config", "/path/to/plugin.conf"] + signal = "none" +``` + + Refer to the execd plugin readmes for more information. + +## Congratulations! + +You've done it! Consider publishing your plugin to github and open a Pull Request +back to the Telegraf repo letting us know about the availability of your +[external plugin](https://github.com/influxdata/telegraf/blob/master/EXTERNAL_PLUGINS.md). \ No newline at end of file diff --git a/plugins/common/shim/config.go b/plugins/common/shim/config.go new file mode 100644 index 0000000000000..33ae8ab2cfb55 --- /dev/null +++ b/plugins/common/shim/config.go @@ -0,0 +1,163 @@ +package shim + +import ( + "errors" + "fmt" + "io/ioutil" + "os" + + "github.com/BurntSushi/toml" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/processors" +) + +type config struct { + Inputs map[string][]toml.Primitive + Processors map[string][]toml.Primitive + Outputs map[string][]toml.Primitive +} + +type loadedConfig struct { + Input telegraf.Input + Processor telegraf.StreamingProcessor + Output telegraf.Output +} + +// LoadConfig Adds plugins to the shim +func (s *Shim) LoadConfig(filePath *string) error { + conf, err := LoadConfig(filePath) + if err != nil { + return err + } + if conf.Input != nil { + if err = s.AddInput(conf.Input); err != nil { + return fmt.Errorf("Failed to add Input: %w", err) + } + } else if conf.Processor != nil { + if err = s.AddStreamingProcessor(conf.Processor); err != nil { + return fmt.Errorf("Failed to add Processor: %w", err) + } + } else if conf.Output != nil { + if err = s.AddOutput(conf.Output); err != nil { + return fmt.Errorf("Failed to add Output: %w", err) + } + } + return nil +} + +// LoadConfig loads the config and returns inputs that later need to be loaded. +func LoadConfig(filePath *string) (loaded loadedConfig, err error) { + var data string + conf := config{} + if filePath != nil && *filePath != "" { + + b, err := ioutil.ReadFile(*filePath) + if err != nil { + return loadedConfig{}, err + } + + data = expandEnvVars(b) + + } else { + conf, err = DefaultImportedPlugins() + if err != nil { + return loadedConfig{}, err + } + } + + md, err := toml.Decode(data, &conf) + if err != nil { + return loadedConfig{}, err + } + + return createPluginsWithTomlConfig(md, conf) +} + +func expandEnvVars(contents []byte) string { + return os.Expand(string(contents), getEnv) +} + +func getEnv(key string) string { + v := os.Getenv(key) + + return envVarEscaper.Replace(v) +} + +func createPluginsWithTomlConfig(md toml.MetaData, conf config) (loadedConfig, error) { + loadedConf := loadedConfig{} + + for name, primitives := range conf.Inputs { + creator, ok := inputs.Inputs[name] + if !ok { + return loadedConf, errors.New("unknown input " + name) + } + + plugin := creator() + if len(primitives) > 0 { + primitive := primitives[0] + if err := md.PrimitiveDecode(primitive, plugin); err != nil { + return loadedConf, err + } + } + + loadedConf.Input = plugin + break + } + + for name, primitives := range conf.Processors { + creator, ok := processors.Processors[name] + if !ok { + return loadedConf, errors.New("unknown processor " + name) + } + + plugin := creator() + if len(primitives) > 0 { + primitive := primitives[0] + if err := md.PrimitiveDecode(primitive, plugin); err != nil { + return loadedConf, err + } + } + loadedConf.Processor = plugin + break + } + + for name, primitives := range conf.Outputs { + creator, ok := outputs.Outputs[name] + if !ok { + return loadedConf, errors.New("unknown output " + name) + } + + plugin := creator() + if len(primitives) > 0 { + primitive := primitives[0] + if err := md.PrimitiveDecode(primitive, plugin); err != nil { + return loadedConf, err + } + } + loadedConf.Output = plugin + break + } + return loadedConf, nil +} + +// DefaultImportedPlugins defaults to whatever plugins happen to be loaded and +// have registered themselves with the registry. This makes loading plugins +// without having to define a config dead easy. +func DefaultImportedPlugins() (config, error) { + conf := config{} + for name := range inputs.Inputs { + conf.Inputs[name] = []toml.Primitive{} + return conf, nil + } + for name := range processors.Processors { + conf.Processors[name] = []toml.Primitive{} + return conf, nil + } + for name := range outputs.Outputs { + conf.Outputs[name] = []toml.Primitive{} + return conf, nil + } + return conf, nil +} diff --git a/plugins/common/shim/config_test.go b/plugins/common/shim/config_test.go new file mode 100644 index 0000000000000..1a6e0bc32e6cd --- /dev/null +++ b/plugins/common/shim/config_test.go @@ -0,0 +1,29 @@ +package shim + +import ( + "os" + "testing" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/stretchr/testify/require" +) + +func TestLoadConfig(t *testing.T) { + os.Setenv("SECRET_TOKEN", "xxxxxxxxxx") + os.Setenv("SECRET_VALUE", `test"\test`) + + inputs.Add("test", func() telegraf.Input { + return &serviceInput{} + }) + + c := "./testdata/plugin.conf" + conf, err := LoadConfig(&c) + require.NoError(t, err) + + inp := conf.Input.(*serviceInput) + + require.Equal(t, "awesome name", inp.ServiceName) + require.Equal(t, "xxxxxxxxxx", inp.SecretToken) + require.Equal(t, `test"\test`, inp.SecretValue) +} diff --git a/plugins/common/shim/example/cmd/main.go b/plugins/common/shim/example/cmd/main.go new file mode 100644 index 0000000000000..4f51f7f878fb3 --- /dev/null +++ b/plugins/common/shim/example/cmd/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + // TODO: import your plugins + // _ "github.com/my_github_user/my_plugin_repo/plugins/inputs/mypluginname" + + "github.com/influxdata/telegraf/plugins/common/shim" +) + +var pollInterval = flag.Duration("poll_interval", 1*time.Second, "how often to send metrics") +var pollIntervalDisabled = flag.Bool("poll_interval_disabled", false, "how often to send metrics") +var configFile = flag.String("config", "", "path to the config file for this plugin") +var err error + +// This is designed to be simple; Just change the import above and you're good. +// +// However, if you want to do all your config in code, you can like so: +// +// // initialize your plugin with any settngs you want +// myInput := &mypluginname.MyPlugin{ +// DefaultSettingHere: 3, +// } +// +// shim := shim.New() +// +// shim.AddInput(myInput) +// +// // now the shim.Run() call as below. +// +func main() { + // parse command line options + flag.Parse() + if *pollIntervalDisabled { + *pollInterval = shim.PollIntervalDisabled + } + + // create the shim. This is what will run your plugins. + shim := shim.New() + + // If no config is specified, all imported plugins are loaded. + // otherwise follow what the config asks for. + // Check for settings from a config toml file, + // (or just use whatever plugins were imported above) + err = shim.LoadConfig(configFile) + if err != nil { + fmt.Fprintf(os.Stderr, "Err loading input: %s\n", err) + os.Exit(1) + } + + // run the input plugin(s) until stdin closes or we receive a termination signal + if err := shim.Run(*pollInterval); err != nil { + fmt.Fprintf(os.Stderr, "Err: %s\n", err) + os.Exit(1) + } +} diff --git a/plugins/common/shim/example/cmd/plugin.conf b/plugins/common/shim/example/cmd/plugin.conf new file mode 100644 index 0000000000000..53f89a55946ca --- /dev/null +++ b/plugins/common/shim/example/cmd/plugin.conf @@ -0,0 +1,2 @@ +[[inputs.my_plugin_name]] + value_name = "value" diff --git a/plugins/common/shim/goshim.go b/plugins/common/shim/goshim.go new file mode 100644 index 0000000000000..2490967eebb48 --- /dev/null +++ b/plugins/common/shim/goshim.go @@ -0,0 +1,131 @@ +package shim + +import ( + "context" + "fmt" + "io" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers/influx" +) + +type empty struct{} + +var ( + forever = 100 * 365 * 24 * time.Hour + envVarEscaper = strings.NewReplacer( + `"`, `\"`, + `\`, `\\`, + ) +) + +const ( + // PollIntervalDisabled is used to indicate that you want to disable polling, + // as opposed to duration 0 meaning poll constantly. + PollIntervalDisabled = time.Duration(0) +) + +// Shim allows you to wrap your inputs and run them as if they were part of Telegraf, +// except built externally. +type Shim struct { + Input telegraf.Input + Processor telegraf.StreamingProcessor + Output telegraf.Output + + // streams + stdin io.Reader + stdout io.Writer + stderr io.Writer + + // outgoing metric channel + metricCh chan telegraf.Metric + + // input only + gatherPromptCh chan empty +} + +// New creates a new shim interface +func New() *Shim { + return &Shim{ + metricCh: make(chan telegraf.Metric, 1), + stdin: os.Stdin, + stdout: os.Stdout, + stderr: os.Stderr, + } +} + +func (s *Shim) watchForShutdown(cancel context.CancelFunc) { + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-quit // user-triggered quit + // cancel, but keep looping until the metric channel closes. + cancel() + }() +} + +// Run the input plugins.. +func (s *Shim) Run(pollInterval time.Duration) error { + if s.Input != nil { + err := s.RunInput(pollInterval) + if err != nil { + return fmt.Errorf("RunInput error: %w", err) + } + } else if s.Processor != nil { + err := s.RunProcessor() + if err != nil { + return fmt.Errorf("RunProcessor error: %w", err) + } + } else if s.Output != nil { + err := s.RunOutput() + if err != nil { + return fmt.Errorf("RunOutput error: %w", err) + } + } else { + return fmt.Errorf("Nothing to run") + } + + return nil +} + +func hasQuit(ctx context.Context) bool { + return ctx.Err() != nil +} + +func (s *Shim) writeProcessedMetrics() error { + serializer := influx.NewSerializer() + for { + select { + case m, open := <-s.metricCh: + if !open { + return nil + } + b, err := serializer.Serialize(m) + if err != nil { + return fmt.Errorf("failed to serialize metric: %s", err) + } + // Write this to stdout + fmt.Fprint(s.stdout, string(b)) + } + } +} + +// LogName satisfies the MetricMaker interface +func (s *Shim) LogName() string { + return "" +} + +// MakeMetric satisfies the MetricMaker interface +func (s *Shim) MakeMetric(m telegraf.Metric) telegraf.Metric { + return m // don't need to do anything to it. +} + +// Log satisfies the MetricMaker interface +func (s *Shim) Log() telegraf.Logger { + return nil +} diff --git a/plugins/common/shim/input.go b/plugins/common/shim/input.go new file mode 100644 index 0000000000000..006f2ad046226 --- /dev/null +++ b/plugins/common/shim/input.go @@ -0,0 +1,108 @@ +package shim + +import ( + "bufio" + "context" + "fmt" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/agent" +) + +// AddInput adds the input to the shim. Later calls to Run() will run this input. +func (s *Shim) AddInput(input telegraf.Input) error { + if p, ok := input.(telegraf.Initializer); ok { + err := p.Init() + if err != nil { + return fmt.Errorf("failed to init input: %s", err) + } + } + + s.Input = input + return nil +} + +func (s *Shim) RunInput(pollInterval time.Duration) error { + // context is used only to close the stdin reader. everything else cascades + // from that point and closes cleanly when it's done. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.watchForShutdown(cancel) + + acc := agent.NewAccumulator(s, s.metricCh) + acc.SetPrecision(time.Nanosecond) + + if serviceInput, ok := s.Input.(telegraf.ServiceInput); ok { + if err := serviceInput.Start(acc); err != nil { + return fmt.Errorf("failed to start input: %s", err) + } + } + s.gatherPromptCh = make(chan empty, 1) + go func() { + s.startGathering(ctx, s.Input, acc, pollInterval) + if serviceInput, ok := s.Input.(telegraf.ServiceInput); ok { + serviceInput.Stop() + } + // closing the metric channel gracefully stops writing to stdout + close(s.metricCh) + }() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + s.writeProcessedMetrics() + wg.Done() + }() + + scanner := bufio.NewScanner(s.stdin) + for scanner.Scan() { + // push a non-blocking message to trigger metric collection. + s.pushCollectMetricsRequest() + } + + cancel() // cancel gracefully stops gathering + wg.Wait() // wait for writing to stdout to finish + return nil +} + +func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, pollInterval time.Duration) { + if pollInterval == PollIntervalDisabled { + pollInterval = forever + } + t := time.NewTicker(pollInterval) + defer t.Stop() + for { + // give priority to stopping. + if hasQuit(ctx) { + return + } + // see what's up + select { + case <-ctx.Done(): + return + case <-s.gatherPromptCh: + if err := input.Gather(acc); err != nil { + fmt.Fprintf(s.stderr, "failed to gather metrics: %s\n", err) + } + case <-t.C: + if err := input.Gather(acc); err != nil { + fmt.Fprintf(s.stderr, "failed to gather metrics: %s\n", err) + } + } + } +} + +// pushCollectMetricsRequest pushes a non-blocking (nil) message to the +// gatherPromptCh channel to trigger metric collection. +// The channel is defined with a buffer of 1, so while it's full, subsequent +// requests are discarded. +func (s *Shim) pushCollectMetricsRequest() { + // push a message out to each channel to collect metrics. don't block. + select { + case s.gatherPromptCh <- empty{}: + default: + } +} diff --git a/plugins/common/shim/input_test.go b/plugins/common/shim/input_test.go new file mode 100644 index 0000000000000..709ac79ef4fdc --- /dev/null +++ b/plugins/common/shim/input_test.go @@ -0,0 +1,141 @@ +package shim + +import ( + "bufio" + "io" + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" +) + +func TestInputShimTimer(t *testing.T) { + stdoutReader, stdoutWriter := io.Pipe() + + stdin, _ := io.Pipe() // hold the stdin pipe open + + metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond, stdin, stdoutWriter, nil) + + <-metricProcessed + r := bufio.NewReader(stdoutReader) + out, err := r.ReadString('\n') + require.NoError(t, err) + require.Contains(t, out, "\n") + metricLine := strings.Split(out, "\n")[0] + require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine) +} + +func TestInputShimStdinSignalingWorks(t *testing.T) { + stdinReader, stdinWriter := io.Pipe() + stdoutReader, stdoutWriter := io.Pipe() + + metricProcessed, exited := runInputPlugin(t, 40*time.Second, stdinReader, stdoutWriter, nil) + + stdinWriter.Write([]byte("\n")) + + <-metricProcessed + + r := bufio.NewReader(stdoutReader) + out, err := r.ReadString('\n') + require.NoError(t, err) + require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out) + + stdinWriter.Close() + go ioutil.ReadAll(r) + // check that it exits cleanly + <-exited +} + +func runInputPlugin(t *testing.T, interval time.Duration, stdin io.Reader, stdout, stderr io.Writer) (metricProcessed chan bool, exited chan bool) { + metricProcessed = make(chan bool, 1) + exited = make(chan bool, 1) + inp := &testInput{ + metricProcessed: metricProcessed, + } + + shim := New() + if stdin != nil { + shim.stdin = stdin + } + if stdout != nil { + shim.stdout = stdout + } + if stderr != nil { + shim.stderr = stderr + } + shim.AddInput(inp) + go func() { + err := shim.Run(interval) + require.NoError(t, err) + exited <- true + }() + return metricProcessed, exited +} + +type testInput struct { + metricProcessed chan bool +} + +func (i *testInput) SampleConfig() string { + return "" +} + +func (i *testInput) Description() string { + return "" +} + +func (i *testInput) Gather(acc telegraf.Accumulator) error { + acc.AddFields("measurement", + map[string]interface{}{ + "field": 1, + }, + map[string]string{ + "tag": "tag", + }, time.Unix(1234, 5678)) + i.metricProcessed <- true + return nil +} + +func (i *testInput) Start(acc telegraf.Accumulator) error { + return nil +} + +func (i *testInput) Stop() { +} + +type serviceInput struct { + ServiceName string `toml:"service_name"` + SecretToken string `toml:"secret_token"` + SecretValue string `toml:"secret_value"` +} + +func (i *serviceInput) SampleConfig() string { + return "" +} + +func (i *serviceInput) Description() string { + return "" +} + +func (i *serviceInput) Gather(acc telegraf.Accumulator) error { + acc.AddFields("measurement", + map[string]interface{}{ + "field": 1, + }, + map[string]string{ + "tag": "tag", + }, time.Unix(1234, 5678)) + + return nil +} + +func (i *serviceInput) Start(acc telegraf.Accumulator) error { + return nil +} + +func (i *serviceInput) Stop() { +} diff --git a/plugins/common/shim/output.go b/plugins/common/shim/output.go new file mode 100644 index 0000000000000..ac2f16619cb37 --- /dev/null +++ b/plugins/common/shim/output.go @@ -0,0 +1,51 @@ +package shim + +import ( + "bufio" + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" +) + +// AddOutput adds the input to the shim. Later calls to Run() will run this. +func (s *Shim) AddOutput(output telegraf.Output) error { + if p, ok := output.(telegraf.Initializer); ok { + err := p.Init() + if err != nil { + return fmt.Errorf("failed to init input: %s", err) + } + } + + s.Output = output + return nil +} + +func (s *Shim) RunOutput() error { + parser, err := parsers.NewInfluxParser() + if err != nil { + return fmt.Errorf("Failed to create new parser: %w", err) + } + + err = s.Output.Connect() + if err != nil { + return fmt.Errorf("failed to start processor: %w", err) + } + defer s.Output.Close() + + var m telegraf.Metric + + scanner := bufio.NewScanner(s.stdin) + for scanner.Scan() { + m, err = parser.ParseLine(scanner.Text()) + if err != nil { + fmt.Fprintf(s.stderr, "Failed to parse metric: %s\n", err) + continue + } + if err = s.Output.Write([]telegraf.Metric{m}); err != nil { + fmt.Fprintf(s.stderr, "Failed to write metric: %s\n", err) + } + } + + return nil +} diff --git a/plugins/common/shim/output_test.go b/plugins/common/shim/output_test.go new file mode 100644 index 0000000000000..5a74d59edb240 --- /dev/null +++ b/plugins/common/shim/output_test.go @@ -0,0 +1,82 @@ +package shim + +import ( + "io" + "sync" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestOutputShim(t *testing.T) { + o := &testOutput{} + + stdinReader, stdinWriter := io.Pipe() + + s := New() + s.stdin = stdinReader + err := s.AddOutput(o) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + err := s.RunOutput() + require.NoError(t, err) + wg.Done() + }() + + serializer, _ := serializers.NewInfluxSerializer() + + m, _ := metric.New("thing", + map[string]string{ + "a": "b", + }, + map[string]interface{}{ + "v": 1, + }, + time.Now(), + ) + b, err := serializer.Serialize(m) + require.NoError(t, err) + _, err = stdinWriter.Write(b) + require.NoError(t, err) + err = stdinWriter.Close() + require.NoError(t, err) + + wg.Wait() + + require.Len(t, o.MetricsWritten, 1) + mOut := o.MetricsWritten[0] + + testutil.RequireMetricEqual(t, m, mOut) +} + +type testOutput struct { + MetricsWritten []telegraf.Metric +} + +func (o *testOutput) Connect() error { + return nil +} +func (o *testOutput) Close() error { + return nil +} +func (o *testOutput) Write(metrics []telegraf.Metric) error { + o.MetricsWritten = append(o.MetricsWritten, metrics...) + return nil +} + +func (o *testOutput) SampleConfig() string { + return "" +} + +func (o *testOutput) Description() string { + return "" +} diff --git a/plugins/common/shim/processor.go b/plugins/common/shim/processor.go new file mode 100644 index 0000000000000..75d8fc69432bf --- /dev/null +++ b/plugins/common/shim/processor.go @@ -0,0 +1,69 @@ +package shim + +import ( + "bufio" + "fmt" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/agent" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/processors" +) + +// AddProcessor adds the processor to the shim. Later calls to Run() will run this. +func (s *Shim) AddProcessor(processor telegraf.Processor) error { + p := processors.NewStreamingProcessorFromProcessor(processor) + return s.AddStreamingProcessor(p) +} + +// AddStreamingProcessor adds the processor to the shim. Later calls to Run() will run this. +func (s *Shim) AddStreamingProcessor(processor telegraf.StreamingProcessor) error { + if p, ok := processor.(telegraf.Initializer); ok { + err := p.Init() + if err != nil { + return fmt.Errorf("failed to init input: %s", err) + } + } + + s.Processor = processor + return nil +} + +func (s *Shim) RunProcessor() error { + acc := agent.NewAccumulator(s, s.metricCh) + acc.SetPrecision(time.Nanosecond) + + parser, err := parsers.NewInfluxParser() + if err != nil { + return fmt.Errorf("Failed to create new parser: %w", err) + } + + err = s.Processor.Start(acc) + if err != nil { + return fmt.Errorf("failed to start processor: %w", err) + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + s.writeProcessedMetrics() + wg.Done() + }() + + scanner := bufio.NewScanner(s.stdin) + for scanner.Scan() { + m, err := parser.ParseLine(scanner.Text()) + if err != nil { + fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", err) + continue + } + s.Processor.Add(m, acc) + } + + close(s.metricCh) + s.Processor.Stop() + wg.Wait() + return nil +} diff --git a/plugins/common/shim/processor_test.go b/plugins/common/shim/processor_test.go new file mode 100644 index 0000000000000..b4cf01ae0236f --- /dev/null +++ b/plugins/common/shim/processor_test.go @@ -0,0 +1,88 @@ +package shim + +import ( + "bufio" + "io" + "io/ioutil" + "sync" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/stretchr/testify/require" +) + +func TestProcessorShim(t *testing.T) { + p := &testProcessor{} + + stdinReader, stdinWriter := io.Pipe() + stdoutReader, stdoutWriter := io.Pipe() + + s := New() + // inject test into shim + s.stdin = stdinReader + s.stdout = stdoutWriter + err := s.AddProcessor(p) + require.NoError(t, err) + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + err := s.RunProcessor() + require.NoError(t, err) + wg.Done() + }() + + serializer, _ := serializers.NewInfluxSerializer() + parser, _ := parsers.NewInfluxParser() + + m, _ := metric.New("thing", + map[string]string{ + "a": "b", + }, + map[string]interface{}{ + "v": 1, + }, + time.Now(), + ) + b, err := serializer.Serialize(m) + require.NoError(t, err) + _, err = stdinWriter.Write(b) + require.NoError(t, err) + err = stdinWriter.Close() + require.NoError(t, err) + + r := bufio.NewReader(stdoutReader) + out, err := r.ReadString('\n') + require.NoError(t, err) + mOut, err := parser.ParseLine(out) + require.NoError(t, err) + + val, ok := mOut.GetTag("hi") + require.True(t, ok) + require.Equal(t, "mom", val) + + go ioutil.ReadAll(r) + wg.Wait() +} + +type testProcessor struct{} + +func (p *testProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, metric := range in { + metric.AddTag("hi", "mom") + } + return in +} + +func (p *testProcessor) SampleConfig() string { + return "" +} + +func (p *testProcessor) Description() string { + return "" +} diff --git a/plugins/common/shim/testdata/plugin.conf b/plugins/common/shim/testdata/plugin.conf new file mode 100644 index 0000000000000..78dbb33a90683 --- /dev/null +++ b/plugins/common/shim/testdata/plugin.conf @@ -0,0 +1,4 @@ +[[inputs.test]] + service_name = "awesome name" + secret_token = "${SECRET_TOKEN}" + secret_value = "$SECRET_VALUE" diff --git a/plugins/inputs/execd/shim/README.md b/plugins/inputs/execd/shim/README.md index 3bdb69f92a6a9..0e7fa9cd7e905 100644 --- a/plugins/inputs/execd/shim/README.md +++ b/plugins/inputs/execd/shim/README.md @@ -1,48 +1,3 @@ # Telegraf Execd Go Shim -The goal of this _shim_ is to make it trivial to extract an internal input plugin -out to a stand-alone repo for the purpose of compiling it as a separate app and -running it from the inputs.execd plugin. - -The execd-shim is still experimental and the interface may change in the future. -Especially as the concept expands to processors, aggregators, and outputs. - -## Steps to externalize a plugin - -1. Move the project to an external repo, optionally preserving the - _plugins/inputs/plugin_name_ folder structure. For an example of what this might - look at, take a look at [ssoroka/rand](https://github.com/ssoroka/rand) or - [danielnelson/telegraf-plugins](https://github.com/danielnelson/telegraf-plugins) -1. Copy [main.go](./example/cmd/main.go) into your project under the cmd folder. - This will be the entrypoint to the plugin when run as a stand-alone program, and - it will call the shim code for you to make that happen. -1. Edit the main.go file to import your plugin. Within Telegraf this would have - been done in an all.go file, but here we don't split the two apart, and the change - just goes in the top of main.go. If you skip this step, your plugin will do nothing. -1. Optionally add a [plugin.conf](./example/cmd/plugin.conf) for configuration - specific to your plugin. Note that this config file **must be separate from the - rest of the config for Telegraf, and must not be in a shared directory where - Telegraf is expecting to load all configs**. If Telegraf reads this config file - it will not know which plugin it relates to. - -## Steps to build and run your plugin - -1. Build the cmd/main.go. For my rand project this looks like `go build -o rand cmd/main.go` -1. Test out the binary if you haven't done this yet. eg `./rand -config plugin.conf` - Depending on your polling settings and whether you implemented a service plugin or - an input gathering plugin, you may see data right away, or you may have to hit enter - first, or wait for your poll duration to elapse, but the metrics will be written to - STDOUT. Ctrl-C to end your test. -1. Configure Telegraf to call your new plugin binary. eg: - -``` -[[inputs.execd]] - command = ["/path/to/rand", "-config", "/path/to/plugin.conf"] - signal = "none" -``` - -## Congratulations! - -You've done it! Consider publishing your plugin to github and open a Pull Request -back to the Telegraf repo letting us know about the availability of your -[external plugin](https://github.com/influxdata/telegraf/blob/master/EXTERNAL_PLUGINS.md). \ No newline at end of file +This is deprecated. Please see (/plugins/common/shim/README.md)[https://github.com/influxdata/telegraf/tree/master/plugins/common/shim/README.md] \ No newline at end of file diff --git a/plugins/inputs/execd/shim/goshim.go b/plugins/inputs/execd/shim/goshim.go index d38f17ffdb665..1ea794fb6877d 100644 --- a/plugins/inputs/execd/shim/goshim.go +++ b/plugins/inputs/execd/shim/goshim.go @@ -24,10 +24,8 @@ import ( type empty struct{} var ( - stdout io.Writer = os.Stdout - stdin io.Reader = os.Stdin - forever = 100 * 365 * 24 * time.Hour - envVarEscaper = strings.NewReplacer( + forever = 100 * 365 * 24 * time.Hour + envVarEscaper = strings.NewReplacer( `"`, `\"`, `\`, `\\`, ) @@ -45,11 +43,19 @@ type Shim struct { Inputs []telegraf.Input gatherPromptChans []chan empty metricCh chan telegraf.Metric + + stdin io.Reader + stdout io.Writer + stderr io.Writer } // New creates a new shim interface func New() *Shim { - return &Shim{} + return &Shim{ + stdin: os.Stdin, + stdout: os.Stdout, + stderr: os.Stderr, + } } // AddInput adds the input to the shim. Later calls to Run() will run this input. @@ -108,7 +114,7 @@ func (s *Shim) Run(pollInterval time.Duration) error { s.gatherPromptChans = append(s.gatherPromptChans, gatherPromptCh) wg.Add(1) // one per input go func(input telegraf.Input) { - startGathering(ctx, input, acc, gatherPromptCh, pollInterval) + s.startGathering(ctx, input, acc, gatherPromptCh, pollInterval) if serviceInput, ok := input.(telegraf.ServiceInput); ok { serviceInput.Stop() } @@ -141,7 +147,7 @@ loop: return fmt.Errorf("failed to serialize metric: %s", err) } // Write this to stdout - fmt.Fprint(stdout, string(b)) + fmt.Fprint(s.stdout, string(b)) } } @@ -163,7 +169,7 @@ func (s *Shim) stdinCollectMetricsPrompt(ctx context.Context, cancel context.Can close(collectMetricsPrompt) }() - scanner := bufio.NewScanner(stdin) + scanner := bufio.NewScanner(s.stdin) // for every line read from stdin, make sure we're not supposed to quit, // then push a message on to the collectMetricsPrompt for scanner.Scan() { @@ -201,7 +207,7 @@ func (s *Shim) collectMetrics(ctx context.Context) { } } -func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, gatherPromptCh <-chan empty, pollInterval time.Duration) { +func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, gatherPromptCh <-chan empty, pollInterval time.Duration) { if pollInterval == PollIntervalDisabled { return // don't poll } @@ -218,11 +224,11 @@ func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accu return case <-gatherPromptCh: if err := input.Gather(acc); err != nil { - fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err) + fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err) } case <-t.C: if err := input.Gather(acc); err != nil { - fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err) + fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err) } } } @@ -269,12 +275,7 @@ func LoadConfig(filePath *string) ([]telegraf.Input, error) { return nil, err } - loadedInputs, err := loadConfigIntoInputs(md, conf.Inputs) - - if len(md.Undecoded()) > 0 { - fmt.Fprintf(stdout, "Some plugins were loaded but not used: %q\n", md.Undecoded()) - } - return loadedInputs, err + return loadConfigIntoInputs(md, conf.Inputs) } func expandEnvVars(contents []byte) string { diff --git a/plugins/inputs/execd/shim/shim_posix_test.go b/plugins/inputs/execd/shim/shim_posix_test.go index 00e5dc6c3f595..873ef89bf655f 100644 --- a/plugins/inputs/execd/shim/shim_posix_test.go +++ b/plugins/inputs/execd/shim/shim_posix_test.go @@ -23,12 +23,9 @@ func TestShimUSR1SignalingWorks(t *testing.T) { stdinReader, stdinWriter := io.Pipe() stdoutReader, stdoutWriter := io.Pipe() - stdin = stdinReader - stdout = stdoutWriter - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - metricProcessed, exited := runInputPlugin(t, 20*time.Minute) + metricProcessed, exited := runInputPlugin(t, 20*time.Minute, stdinReader, stdoutWriter, nil) // signal USR1 to yourself. pid := os.Getpid() diff --git a/plugins/inputs/execd/shim/shim_test.go b/plugins/inputs/execd/shim/shim_test.go index 2a31e5adcbd01..dbc3462211222 100644 --- a/plugins/inputs/execd/shim/shim_test.go +++ b/plugins/inputs/execd/shim/shim_test.go @@ -2,7 +2,6 @@ package shim import ( "bufio" - "bytes" "io" "os" "strings" @@ -16,20 +15,16 @@ import ( ) func TestShimWorks(t *testing.T) { - stdoutBytes := bytes.NewBufferString("") - stdout = stdoutBytes + stdoutReader, stdoutWriter := io.Pipe() - stdin, _ = io.Pipe() // hold the stdin pipe open + stdin, _ := io.Pipe() // hold the stdin pipe open - metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond) + metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond, stdin, stdoutWriter, nil) <-metricProcessed - for stdoutBytes.Len() == 0 { - t.Log("Waiting for bytes available in stdout") - time.Sleep(10 * time.Millisecond) - } - - out := string(stdoutBytes.Bytes()) + r := bufio.NewReader(stdoutReader) + out, err := r.ReadString('\n') + require.NoError(t, err) require.Contains(t, out, "\n") metricLine := strings.Split(out, "\n")[0] require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine) @@ -39,10 +34,7 @@ func TestShimStdinSignalingWorks(t *testing.T) { stdinReader, stdinWriter := io.Pipe() stdoutReader, stdoutWriter := io.Pipe() - stdin = stdinReader - stdout = stdoutWriter - - metricProcessed, exited := runInputPlugin(t, 40*time.Second) + metricProcessed, exited := runInputPlugin(t, 40*time.Second, stdinReader, stdoutWriter, nil) stdinWriter.Write([]byte("\n")) @@ -61,14 +53,24 @@ func TestShimStdinSignalingWorks(t *testing.T) { <-exited } -func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan bool, exited chan bool) { - metricProcessed = make(chan bool, 10) +func runInputPlugin(t *testing.T, interval time.Duration, stdin io.Reader, stdout, stderr io.Writer) (metricProcessed chan bool, exited chan bool) { + metricProcessed = make(chan bool) exited = make(chan bool) inp := &testInput{ metricProcessed: metricProcessed, } shim := New() + if stdin != nil { + shim.stdin = stdin + } + if stdout != nil { + shim.stdout = stdout + } + if stderr != nil { + shim.stderr = stderr + } + shim.AddInput(inp) go func() { err := shim.Run(interval) diff --git a/plugins/processors/execd/README.md b/plugins/processors/execd/README.md index f1fdb0b85f9ff..0535dbf5fb054 100644 --- a/plugins/processors/execd/README.md +++ b/plugins/processors/execd/README.md @@ -12,7 +12,7 @@ Program output on standard error is mirrored to the telegraf log. - Metrics with tracking will be considered "delivered" as soon as they are passed to the external process. There is currently no way to match up which metric coming out of the execd process relates to which metric going in (keep in mind - that processors can add and drop metrics, and that this is all done + that processors can add and drop metrics, and that this is all done asynchronously). - it's not currently possible to use a data_format other than "influx", due to the requirement that it is serialize-parse symmetrical and does not lose any