From 36e584e92f22d352ca68cf81e07d34f9e7f4f257 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Thu, 4 Jun 2020 19:09:22 -0400 Subject: [PATCH 1/7] execd processor --- internal/process/process.go | 159 ++++++++++++++++++ internal/process/process_posix.go | 28 +++ internal/process/process_windows.go | 19 +++ plugins/inputs/execd/execd.go | 139 ++------------- plugins/inputs/execd/execd_posix.go | 25 ++- plugins/inputs/execd/execd_test.go | 4 +- plugins/inputs/execd/execd_windows.go | 11 +- plugins/processors/all/all.go | 1 + plugins/processors/execd/README.md | 109 ++++++++++++ plugins/processors/execd/examples/count.go | 24 +++ .../multiplier_line_protocol/multiplier.conf | 14 ++ .../multiplier_line_protocol.rb | 27 +++ plugins/processors/execd/execd.go | 149 ++++++++++++++++ plugins/processors/execd/execd_test.go | 135 +++++++++++++++ 14 files changed, 695 insertions(+), 149 deletions(-) create mode 100644 internal/process/process.go create mode 100644 internal/process/process_posix.go create mode 100644 internal/process/process_windows.go create mode 100644 plugins/processors/execd/README.md create mode 100644 plugins/processors/execd/examples/count.go create mode 100644 plugins/processors/execd/examples/multiplier_line_protocol/multiplier.conf create mode 100644 plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb create mode 100644 plugins/processors/execd/execd.go create mode 100644 plugins/processors/execd/execd_test.go diff --git a/internal/process/process.go b/internal/process/process.go new file mode 100644 index 0000000000000..371c2cd709768 --- /dev/null +++ b/internal/process/process.go @@ -0,0 +1,159 @@ +package process + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "log" + "os/exec" + "sync" + "time" +) + +// Process is a long-running process manager that will restart processes if they stop. +type Process struct { + Cmd *exec.Cmd + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser + ReadStdoutFn func(io.Reader) + ReadStderrFn func(io.Reader) + RestartDelay time.Duration + + cancel context.CancelFunc + mainLoopWg sync.WaitGroup +} + +// New creates a new process wrapper +func New(command []string) (*Process, error) { + p := &Process{ + RestartDelay: 5 * time.Second, + } + if len(command) > 1 { + p.Cmd = exec.Command(command[0], command[1:]...) + } else { + p.Cmd = exec.Command(command[0]) + } + var err error + p.Stdin, err = p.Cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("error opening stdin pipe: %w", err) + } + + p.Stdout, err = p.Cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("error opening stdout pipe: %w", err) + } + + p.Stderr, err = p.Cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("error opening stderr pipe: %w", err) + } + + return p, nil +} + +// Start the process +func (p *Process) Start() error { + p.mainLoopWg.Add(1) + + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + + if err := p.cmdStart(); err != nil { + return err + } + + go func() { + if err := p.cmdLoop(ctx); err != nil { + log.Printf("E! [agent] Process quit with message: %v", err) + } + p.mainLoopWg.Done() + }() + + return nil +} + +func (p *Process) Stop() { + if p.cancel != nil { + p.cancel() + } + p.mainLoopWg.Wait() +} + +func (p *Process) cmdStart() error { + log.Printf("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args) + + if err := p.Cmd.Start(); err != nil { + return fmt.Errorf("Error starting process: %s", err) + } + + return nil +} + +// cmdLoop watches an already running process, restarting it when appropriate. +func (p *Process) cmdLoop(ctx context.Context) error { + go func() { + <-ctx.Done() + if p.Stdin != nil { + p.Stdin.Close() + gracefulStop(p.Cmd, 5*time.Second) + } + }() + + for { + err := p.cmdWait() + if isQuitting(ctx) { + log.Printf("Process %s shut down", p.Cmd.Path) + return nil + } + + log.Printf("Process %s terminated: %v", p.Cmd.Path, err) + log.Printf("Restarting in %s...", time.Duration(p.RestartDelay)) + + select { + case <-ctx.Done(): + return nil + case <-time.After(time.Duration(p.RestartDelay)): + // Continue the loop and restart the process + if err := p.cmdStart(); err != nil { + return err + } + } + } +} + +func (p *Process) cmdWait() error { + var wg sync.WaitGroup + + if p.ReadStdoutFn == nil { + p.ReadStdoutFn = defaultReadPipe + } + if p.ReadStderrFn == nil { + p.ReadStderrFn = defaultReadPipe + } + + wg.Add(1) + go func() { + p.ReadStdoutFn(p.Stdout) + wg.Done() + }() + + wg.Add(1) + go func() { + p.ReadStderrFn(p.Stderr) + wg.Done() + }() + + wg.Wait() + return p.Cmd.Wait() +} + +func isQuitting(ctx context.Context) bool { + return ctx.Err() != nil +} + +func defaultReadPipe(r io.Reader) { + io.Copy(ioutil.Discard, r) +} diff --git a/internal/process/process_posix.go b/internal/process/process_posix.go new file mode 100644 index 0000000000000..ab834242272f5 --- /dev/null +++ b/internal/process/process_posix.go @@ -0,0 +1,28 @@ +// +build !windows + +package process + +import ( + "os/exec" + "syscall" + "time" +) + +func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { + time.AfterFunc(timeout, func() { + if cmd == nil || cmd.ProcessState == nil { + return + } + if !cmd.ProcessState.Exited() { + cmd.Process.Signal(syscall.SIGTERM) + time.AfterFunc(timeout, func() { + if cmd == nil || cmd.ProcessState == nil { + return + } + if !cmd.ProcessState.Exited() { + cmd.Process.Kill() + } + }) + } + }) +} diff --git a/internal/process/process_windows.go b/internal/process/process_windows.go new file mode 100644 index 0000000000000..55b78f8813df8 --- /dev/null +++ b/internal/process/process_windows.go @@ -0,0 +1,19 @@ +// +build windows + +package process + +import ( + "os/exec" + "time" +) + +func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { + time.AfterFunc(timeout, func() { + if cmd == nil || cmd.ProcessState == nil { + return + } + if !cmd.ProcessState.Exited() { + cmd.Process.Kill() + } + }) +} diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index ca9e589d95fde..f44f7648e047e 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -2,16 +2,14 @@ package execd import ( "bufio" - "context" "fmt" "io" "log" - "os/exec" - "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/process" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/influx" @@ -46,14 +44,9 @@ type Execd struct { Signal string RestartDelay config.Duration - acc telegraf.Accumulator - cmd *exec.Cmd - parser parsers.Parser - stdin io.WriteCloser - stdout io.ReadCloser - stderr io.ReadCloser - cancel context.CancelFunc - mainLoopWg sync.WaitGroup + process *process.Process + acc telegraf.Accumulator + parser parsers.Parser } func (e *Execd) SampleConfig() string { @@ -70,131 +63,29 @@ func (e *Execd) SetParser(parser parsers.Parser) { func (e *Execd) Start(acc telegraf.Accumulator) error { e.acc = acc - if len(e.Command) == 0 { return fmt.Errorf("FATAL no command specified") } - e.mainLoopWg.Add(1) - - ctx, cancel := context.WithCancel(context.Background()) - e.cancel = cancel - - if err := e.cmdStart(); err != nil { - return err - } - - go func() { - if err := e.cmdLoop(ctx); err != nil { - log.Printf("Process quit with message: %s", err.Error()) - } - e.mainLoopWg.Done() - }() - - return nil -} - -func (e *Execd) Stop() { - // don't try to stop before all stream readers have started. - e.cancel() - e.mainLoopWg.Wait() -} - -// cmdLoop watches an already running process, restarting it when appropriate. -func (e *Execd) cmdLoop(ctx context.Context) error { - for { - // Use a buffered channel to ensure goroutine below can exit - // if `ctx.Done` is selected and nothing reads on `done` anymore - done := make(chan error, 1) - go func() { - done <- e.cmdWait() - }() - - select { - case <-ctx.Done(): - if e.stdin != nil { - e.stdin.Close() - gracefulStop(e.cmd, 5*time.Second) - } - return nil - case err := <-done: - log.Printf("Process %s terminated: %s", e.Command, err) - if isQuitting(ctx) { - return err - } - } - - log.Printf("Restarting in %s...", time.Duration(e.RestartDelay)) - - select { - case <-ctx.Done(): - return nil - case <-time.After(time.Duration(e.RestartDelay)): - // Continue the loop and restart the process - if err := e.cmdStart(); err != nil { - return err - } - } - } -} - -func isQuitting(ctx context.Context) bool { - select { - case <-ctx.Done(): - return true - default: - return false - } -} - -func (e *Execd) cmdStart() (err error) { - if len(e.Command) > 1 { - e.cmd = exec.Command(e.Command[0], e.Command[1:]...) - } else { - e.cmd = exec.Command(e.Command[0]) - } - - e.stdin, err = e.cmd.StdinPipe() - if err != nil { - return fmt.Errorf("Error opening stdin pipe: %s", err) - } - - e.stdout, err = e.cmd.StdoutPipe() - if err != nil { - return fmt.Errorf("Error opening stdout pipe: %s", err) - } - - e.stderr, err = e.cmd.StderrPipe() + var err error + e.process, err = process.New(e.Command) if err != nil { - return fmt.Errorf("Error opening stderr pipe: %s", err) + return fmt.Errorf("Error creating new process: %w", err) } - log.Printf("Starting process: %s", e.Command) + e.process.RestartDelay = time.Duration(e.RestartDelay) + e.process.ReadStdoutFn = e.cmdReadOut + e.process.ReadStderrFn = e.cmdReadErr - err = e.cmd.Start() - if err != nil { - return fmt.Errorf("Error starting process: %s", err) + if err = e.process.Start(); err != nil { + return fmt.Errorf("failed to start process %s: %w", e.Command, err) } return nil } -func (e *Execd) cmdWait() error { - var wg sync.WaitGroup - wg.Add(2) - - go func() { - e.cmdReadOut(e.stdout) - wg.Done() - }() - - go func() { - e.cmdReadErr(e.stderr) - wg.Done() - }() - - wg.Wait() - return e.cmd.Wait() +func (e *Execd) Stop() { + e.process.Stop() } func (e *Execd) cmdReadOut(out io.Reader) { @@ -249,7 +140,7 @@ func (e *Execd) cmdReadErr(out io.Reader) { scanner := bufio.NewScanner(out) for scanner.Scan() { - log.Printf("stderr: %q", scanner.Text()) + log.Printf("[inputs.execd] stderr: %q", scanner.Text()) } if err := scanner.Err(); err != nil { diff --git a/plugins/inputs/execd/execd_posix.go b/plugins/inputs/execd/execd_posix.go index cc3a8e8bb5aac..4d8789a8d3215 100644 --- a/plugins/inputs/execd/execd_posix.go +++ b/plugins/inputs/execd/execd_posix.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "os" - "os/exec" "syscall" "time" @@ -14,22 +13,26 @@ import ( ) func (e *Execd) Gather(acc telegraf.Accumulator) error { - if e.cmd == nil || e.cmd.Process == nil { + if e.process == nil || e.process.Cmd == nil { return nil } + osProcess := e.process.Cmd.Process + if osProcess == nil { + return nil + } switch e.Signal { case "SIGHUP": - e.cmd.Process.Signal(syscall.SIGHUP) + osProcess.Signal(syscall.SIGHUP) case "SIGUSR1": - e.cmd.Process.Signal(syscall.SIGUSR1) + osProcess.Signal(syscall.SIGUSR1) case "SIGUSR2": - e.cmd.Process.Signal(syscall.SIGUSR2) + osProcess.Signal(syscall.SIGUSR2) case "STDIN": - if osStdin, ok := e.stdin.(*os.File); ok { + if osStdin, ok := e.process.Stdin.(*os.File); ok { osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second)) } - if _, err := io.WriteString(e.stdin, "\n"); err != nil { + if _, err := io.WriteString(e.process.Stdin, "\n"); err != nil { return fmt.Errorf("Error writing to stdin: %s", err) } case "none": @@ -39,11 +42,3 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error { return nil } - -func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { - cmd.Process.Signal(syscall.SIGTERM) - go func() { - <-time.NewTimer(timeout).C - cmd.Process.Kill() - }() -} diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index 52c0a214b6ea6..1cdbfdc5f6d20 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -19,13 +19,13 @@ import ( ) func TestExternalInputWorks(t *testing.T) { - jsonParser, err := parsers.NewInfluxParser() + influxParser, err := parsers.NewInfluxParser() require.NoError(t, err) e := &Execd{ Command: []string{shell(), fileShellScriptPath()}, RestartDelay: config.Duration(5 * time.Second), - parser: jsonParser, + parser: influxParser, Signal: "STDIN", } diff --git a/plugins/inputs/execd/execd_windows.go b/plugins/inputs/execd/execd_windows.go index 82935d4ac87c3..15e6798f2389b 100644 --- a/plugins/inputs/execd/execd_windows.go +++ b/plugins/inputs/execd/execd_windows.go @@ -6,23 +6,22 @@ import ( "fmt" "io" "os" - "os/exec" "time" "github.com/influxdata/telegraf" ) func (e *Execd) Gather(acc telegraf.Accumulator) error { - if e.cmd == nil || e.cmd.Process == nil { + if e.process == nil { return nil } switch e.Signal { case "STDIN": - if osStdin, ok := e.stdin.(*os.File); ok { + if osStdin, ok := e.process.Stdin.(*os.File); ok { osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second)) } - if _, err := io.WriteString(e.stdin, "\n"); err != nil { + if _, err := io.WriteString(e.process.Stdin, "\n"); err != nil { return fmt.Errorf("Error writing to stdin: %s", err) } case "none": @@ -32,7 +31,3 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error { return nil } - -func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { - cmd.Process.Kill() -} diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index 6dc2e2b0d6589..ad8d8616c2ba1 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/dedup" _ "github.com/influxdata/telegraf/plugins/processors/defaults" _ "github.com/influxdata/telegraf/plugins/processors/enum" + _ "github.com/influxdata/telegraf/plugins/processors/execd" _ "github.com/influxdata/telegraf/plugins/processors/filepath" _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/parser" diff --git a/plugins/processors/execd/README.md b/plugins/processors/execd/README.md new file mode 100644 index 0000000000000..5e779521a6c5e --- /dev/null +++ b/plugins/processors/execd/README.md @@ -0,0 +1,109 @@ +# Execd Processor Plugin + +The `execd` processor plugin runs an external program as a separate process and +pipes metrics in to the process's STDIN and reads processed metrics from its STDOUT. +The programs must accept influx line protocol on standard in (STDIN) and output +metrics in influx line protocol to standard output (STDOUT). + +Program output on standard error is mirrored to the telegraf log. + +### Caveats + +- Metrics with tracking will be considered "delivered" as soon as they are passed + to the external process. There is currently no way to match up which metric + coming out of the execd process relates to which metric going in (keep in mind + that processors can add and drop metrics, and that this is all done + asynchronously). +- it's not currently possible to use a data_format other than "influx", due to + the requirement that it is serialize-parse symmetrical and does not lose any + critical type data. + +### Configuration: + +```toml +[[processor.execd]] + ## Program to run as daemon + command = ["/path/to/your_program", "arg1", "arg2"] + + ## Delay before the process is restarted after an unexpected termination + # restart_delay = "10s" +``` + +### Example + +#### Go daemon example + +This go daemon reads a metric from stdin, multiplies the "count" field by 2, +and writes the metric back out. + +```go +package main + +import ( + "fmt" + "os" + + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/serializers" +) + +func main() { + parser := influx.NewStreamParser(os.Stdin) + serializer, _ := serializers.NewInfluxSerializer() + + for { + metric, err := parser.Next() + if err != nil { + if err == influx.EOF { + return // stream ended + } + if parseErr, isParseError := err.(*influx.ParseError); isParseError { + fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) + os.Exit(1) + } + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + os.Exit(1) + } + + c, found := metric.GetField("count") + if !found { + fmt.Fprintf(os.Stderr, "metric has no count field\n") + os.Exit(1) + } + switch t := c.(type) { + case float64: + t *= 2 + metric.AddField("count", t) + case int64: + t *= 2 + metric.AddField("count", t) + default: + fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c) + os.Exit(1) + } + b, err := serializer.Serialize(metric) + if err != nil { + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + os.Exit(1) + } + fmt.Fprint(os.Stdout, string(b)) + } +} +``` + +to run it, you'd build the binary using go, eg `go build -o multiplier.exe main.go` + +```toml +[[processors.execd]] + command = ["multiplier.exe"] +``` + +#### Ruby daemon using SIGHUP + +- See [Ruby daemon](./examples/multiplier_line_protocol/multiplier_line_protocol.rb) + +```toml +[[processors.execd]] + command = ["ruby", "plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb"] +``` diff --git a/plugins/processors/execd/examples/count.go b/plugins/processors/execd/examples/count.go new file mode 100644 index 0000000000000..d5e4a12e13359 --- /dev/null +++ b/plugins/processors/execd/examples/count.go @@ -0,0 +1,24 @@ +package main + +// Example using HUP signaling + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP) + + counter := 0 + + for { + <-c + + fmt.Printf("counter_go count=%d\n", counter) + counter++ + } +} diff --git a/plugins/processors/execd/examples/multiplier_line_protocol/multiplier.conf b/plugins/processors/execd/examples/multiplier_line_protocol/multiplier.conf new file mode 100644 index 0000000000000..120e04cbb73f1 --- /dev/null +++ b/plugins/processors/execd/examples/multiplier_line_protocol/multiplier.conf @@ -0,0 +1,14 @@ +[agent] + interval = "10s" + +[[inputs.execd]] + command = ["ruby", "plugins/inputs/execd/examples/count.rb"] + +[[processors.execd]] + command = ["ruby", "plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb"] + + +[[outputs.file]] + files = ["stdout"] + data_format = "influx" + diff --git a/plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb b/plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb new file mode 100644 index 0000000000000..6949a310e8754 --- /dev/null +++ b/plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb @@ -0,0 +1,27 @@ +#!/usr/bin/env ruby + +loop do + # example input: "counter_ruby count=0 1586302128978187000" + line = STDIN.readline.chomp + # parse out influx line protocol sections with a really simple hand-rolled parser that doesn't support escaping. + # for a full line parser in ruby, check out something like the influxdb-lineprotocol-parser gem. + parts = line.split(" ") + case parts.size + when 3 + measurement, fields, timestamp = parts + when 4 + measurement, tags, fields, timestamp = parts + else + STDERR.puts "Unable to parse line protocol" + exit 1 + end + fields = fields.split(",").map{|t| + k,v = t.split("=") + if k == "count" + v = v.to_i * 2 # multiple count metric by two + end + "#{k}=#{v}" + }.join(",") + puts [measurement, tags, fields, timestamp].select{|s| s && s.size != 0 }.join(" ") + STDOUT.flush +end diff --git a/plugins/processors/execd/execd.go b/plugins/processors/execd/execd.go new file mode 100644 index 0000000000000..49d02458b85ec --- /dev/null +++ b/plugins/processors/execd/execd.go @@ -0,0 +1,149 @@ +package execd + +import ( + "bufio" + "fmt" + "io" + "log" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/process" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/processors" + "github.com/influxdata/telegraf/plugins/serializers" +) + +const sampleConfig = ` + ## Program to run as daemon + command = ["telegraf-smartctl", "-d", "/dev/sda"] + + ## Delay before the process is restarted after an unexpected termination + restart_delay = "10s" +` + +type Execd struct { + Command []string `toml:"command"` + RestartDelay config.Duration `toml:"restart_delay"` + + parserConfig *parsers.Config + parser parsers.Parser + serializerConfig *serializers.Config + serializer serializers.Serializer + acc telegraf.Accumulator + process *process.Process +} + +func New() *Execd { + return &Execd{ + RestartDelay: config.Duration(10 * time.Second), + parserConfig: &parsers.Config{ + DataFormat: "influx", + }, + serializerConfig: &serializers.Config{ + DataFormat: "influx", + }, + } +} + +func (e *Execd) SampleConfig() string { + return sampleConfig +} + +func (e *Execd) Description() string { + return "Run executable as long-running processor plugin" +} + +func (e *Execd) Start(acc telegraf.Accumulator) error { + var err error + e.parser, err = parsers.NewParser(e.parserConfig) + if err != nil { + return fmt.Errorf("error creating parser: %w", err) + } + e.serializer, err = serializers.NewSerializer(e.serializerConfig) + if err != nil { + return fmt.Errorf("error creating serializer: %w", err) + } + e.acc = acc + + if len(e.Command) == 0 { + return fmt.Errorf("no command specified") + } + + e.process, err = process.New(e.Command) + if err != nil { + return fmt.Errorf("error creating new process: %w", err) + } + + e.process.RestartDelay = time.Duration(e.RestartDelay) + e.process.ReadStdoutFn = e.cmdReadOut + e.process.ReadStderrFn = e.cmdReadErr + + if err = e.process.Start(); err != nil { + return fmt.Errorf("failed to start process %s: %w", e.Command, err) + } + + return nil +} + +func (e *Execd) Add(m telegraf.Metric, acc telegraf.Accumulator) { + b, err := e.serializer.Serialize(m) + if err != nil { + acc.AddError(fmt.Errorf("metric serializing error: %w", err)) + return + } + + _, err = e.process.Stdin.Write(b) + if err != nil { + acc.AddError(fmt.Errorf("error writing to process stdin: %w", err)) + return + } + + // We cannot maintain tracking metrics at the moment because input/output + // is done asynchronously and we don't have any metric metadata to tie the + // output metric back to the original input metric. + m.Drop() +} + +func (e *Execd) Stop() error { + e.process.Stop() + return nil +} + +func (e *Execd) cmdReadOut(out io.Reader) { + scanner := bufio.NewScanner(out) + + for scanner.Scan() { + metrics, err := e.parser.Parse(scanner.Bytes()) + if err != nil { + log.Println(fmt.Errorf("Parse error: %s", err)) + } + + for _, metric := range metrics { + e.acc.AddMetric(metric) + } + } + + if err := scanner.Err(); err != nil { + log.Println(fmt.Errorf("Error reading stdout: %s", err)) + } +} + +func (e *Execd) cmdReadErr(out io.Reader) { + scanner := bufio.NewScanner(out) + + for scanner.Scan() { + log.Printf("stderr: %q", scanner.Text()) + } + + if err := scanner.Err(); err != nil { + log.Println(fmt.Errorf("Error reading stderr: %s", err)) + } +} + +func init() { + processors.AddStreaming("execd", func() telegraf.StreamingProcessor { + return New() + }) +} diff --git a/plugins/processors/execd/execd_test.go b/plugins/processors/execd/execd_test.go new file mode 100644 index 0000000000000..d25dece64b5e2 --- /dev/null +++ b/plugins/processors/execd/execd_test.go @@ -0,0 +1,135 @@ +package execd + +import ( + "flag" + "fmt" + "os" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestExternalProcessorWorks(t *testing.T) { + e := New() + exe, err := os.Executable() + require.NoError(t, err) + t.Log(exe) + e.Command = []string{exe, "-countmultiplier"} + e.RestartDelay = config.Duration(5 * time.Second) + + acc := &testutil.Accumulator{} + + require.NoError(t, e.Start(acc)) + + now := time.Now() + metrics := []telegraf.Metric{} + for i := 0; i < 10; i++ { + m, err := metric.New("test", + map[string]string{ + "city": "Toronto", + }, + map[string]interface{}{ + "population": 6000000, + "count": 1, + }, + now) + require.NoError(t, err) + metrics = append(metrics, m) + now = now.Add(1) + + e.Add(m, acc) + } + + acc.Wait(1) + m := acc.GetTelegrafMetrics()[0] + + require.NoError(t, e.Stop()) + + require.Equal(t, "test", m.Name()) + + city, ok := m.Tags()["city"] + require.True(t, ok) + require.EqualValues(t, "Toronto", city) + + val, ok := m.Fields()["population"] + require.True(t, ok) + require.EqualValues(t, 6000000, val) + + val, ok = m.Fields()["count"] + require.True(t, ok) + require.EqualValues(t, 2, val) + + metricTime := m.Time().UnixNano() + + // read the other 9 and make sure they're ordered properly + acc.Wait(9) + metrics = acc.GetTelegrafMetrics() + for i := 0; i < 9; i++ { + m = metrics[i+1] + require.EqualValues(t, metricTime+1, m.Time().UnixNano()) + metricTime = m.Time().UnixNano() + } +} + +var countmultiplier = flag.Bool("countmultiplier", false, + "if true, act like line input program instead of test") + +func TestMain(m *testing.M) { + flag.Parse() + if *countmultiplier { + runCountMultiplierProgram() + os.Exit(0) + } + code := m.Run() + os.Exit(code) +} + +func runCountMultiplierProgram() { + parser := influx.NewStreamParser(os.Stdin) + serializer, _ := serializers.NewInfluxSerializer() + + for { + metric, err := parser.Next() + if err != nil { + if err == influx.EOF { + return // stream ended + } + if parseErr, isParseError := err.(*influx.ParseError); isParseError { + fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) + os.Exit(1) + } + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + os.Exit(1) + } + + c, found := metric.GetField("count") + if !found { + fmt.Fprintf(os.Stderr, "metric has no count field\n") + os.Exit(1) + } + switch t := c.(type) { + case float64: + t *= 2 + metric.AddField("count", t) + case int64: + t *= 2 + metric.AddField("count", t) + default: + fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c) + os.Exit(1) + } + b, err := serializer.Serialize(metric) + if err != nil { + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + os.Exit(1) + } + fmt.Fprint(os.Stdout, string(b)) + } +} From 8ddb8f9f262fbd5823335b10836b9a5bc4ad30b2 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Wed, 24 Jun 2020 11:14:21 -0400 Subject: [PATCH 2/7] remove processors/execd/examples/count.go --- plugins/processors/execd/examples/count.go | 24 ---------------------- 1 file changed, 24 deletions(-) delete mode 100644 plugins/processors/execd/examples/count.go diff --git a/plugins/processors/execd/examples/count.go b/plugins/processors/execd/examples/count.go deleted file mode 100644 index d5e4a12e13359..0000000000000 --- a/plugins/processors/execd/examples/count.go +++ /dev/null @@ -1,24 +0,0 @@ -package main - -// Example using HUP signaling - -import ( - "fmt" - "os" - "os/signal" - "syscall" -) - -func main() { - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGHUP) - - counter := 0 - - for { - <-c - - fmt.Printf("counter_go count=%d\n", counter) - counter++ - } -} From 31754635a35eb27646a78313982a076ac578e50c Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Wed, 24 Jun 2020 13:06:05 -0400 Subject: [PATCH 3/7] fix after rebase --- plugins/processors/execd/execd.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/plugins/processors/execd/execd.go b/plugins/processors/execd/execd.go index 49d02458b85ec..f2026fd0e203b 100644 --- a/plugins/processors/execd/execd.go +++ b/plugins/processors/execd/execd.go @@ -87,23 +87,22 @@ func (e *Execd) Start(acc telegraf.Accumulator) error { return nil } -func (e *Execd) Add(m telegraf.Metric, acc telegraf.Accumulator) { +func (e *Execd) Add(m telegraf.Metric, acc telegraf.Accumulator) error { b, err := e.serializer.Serialize(m) if err != nil { - acc.AddError(fmt.Errorf("metric serializing error: %w", err)) - return + return fmt.Errorf("metric serializing error: %w", err) } _, err = e.process.Stdin.Write(b) if err != nil { - acc.AddError(fmt.Errorf("error writing to process stdin: %w", err)) - return + return fmt.Errorf("error writing to process stdin: %w", err) } // We cannot maintain tracking metrics at the moment because input/output // is done asynchronously and we don't have any metric metadata to tie the // output metric back to the original input metric. m.Drop() + return nil } func (e *Execd) Stop() error { From 88b09cf18c3deb5d720ad759242574c2fadfa9bd Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Wed, 24 Jun 2020 13:29:44 -0400 Subject: [PATCH 4/7] fix race --- testutil/accumulator.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 6e5148ef730f2..870eb9e04af8f 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -55,6 +55,8 @@ func (a *Accumulator) NMetrics() uint64 { } func (a *Accumulator) GetTelegrafMetrics() []telegraf.Metric { + a.Lock() + defer a.Unlock() metrics := []telegraf.Metric{} for _, m := range a.Metrics { metrics = append(metrics, FromTestMetric(m)) @@ -251,6 +253,8 @@ func (a *Accumulator) SetDebug(debug bool) { // Get gets the specified measurement point from the accumulator func (a *Accumulator) Get(measurement string) (*Metric, bool) { + a.Lock() + defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { return p, true @@ -261,6 +265,8 @@ func (a *Accumulator) Get(measurement string) (*Metric, bool) { } func (a *Accumulator) HasTag(measurement string, key string) bool { + a.Lock() + defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { _, ok := p.Tags[key] @@ -271,6 +277,8 @@ func (a *Accumulator) HasTag(measurement string, key string) bool { } func (a *Accumulator) TagSetValue(measurement string, key string) string { + a.Lock() + defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { v, ok := p.Tags[key] @@ -283,6 +291,8 @@ func (a *Accumulator) TagSetValue(measurement string, key string) string { } func (a *Accumulator) TagValue(measurement string, key string) string { + a.Lock() + defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { v, ok := p.Tags[key] @@ -323,13 +333,13 @@ func (a *Accumulator) NFields() int { // Wait waits for the given number of metrics to be added to the accumulator. func (a *Accumulator) Wait(n int) { a.Lock() + defer a.Unlock() if a.Cond == nil { a.Cond = sync.NewCond(&a.Mutex) } for int(a.NMetrics()) < n { a.Cond.Wait() } - a.Unlock() } // WaitError waits for the given number of errors to be added to the accumulator. From 9785d3c2c68e550bd53df2f3a4b3f5e835d4aeb4 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Fri, 26 Jun 2020 16:38:07 -0400 Subject: [PATCH 5/7] address feedback --- agent/agent.go | 1 + docs/PROCESSORS.md | 75 ++++++++++++++++++++++++++ internal/process/process.go | 24 +++++---- internal/process/process_posix.go | 4 +- internal/process/process_windows.go | 2 +- plugins/inputs/execd/execd.go | 26 +++++---- plugins/inputs/execd/execd_test.go | 3 ++ plugins/processors/execd/README.md | 3 +- plugins/processors/execd/execd.go | 29 +++++----- plugins/processors/execd/execd_test.go | 27 +++++----- 10 files changed, 145 insertions(+), 49 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index a48ed590af228..0ff5893785293 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -510,6 +510,7 @@ func (a *Agent) runProcessors( for m := range unit.src { if err := unit.processor.Add(m, acc); err != nil { acc.AddError(err) + m.Drop() } } unit.processor.Stop() diff --git a/docs/PROCESSORS.md b/docs/PROCESSORS.md index 6ea82fdae3309..45a4eb2777c3e 100644 --- a/docs/PROCESSORS.md +++ b/docs/PROCESSORS.md @@ -64,6 +64,81 @@ func init() { } ``` +### Streaming Processors + +Streaming processors are a new processor type available to you. They are +particularly useful to implement processor types that use background processes +or goroutines to process multiple metrics at the same time. Some examples of this +are the execd processor, which pipes metrics out to an external process over stdin +and reads them back over stdout, and the reverse_dns processor, which does reverse +dns lookups on IP addresses in fields. While both of these come with a speed cost, +it would be significantly worse if you had to process one metric completely from +start to finish before handling the next metric, and thus they benefit +significantly from a streaming-pipe approach. + +Some differences from classic Processors: + +* Streaming processors must conform to the [telegraf.StreamingProcessor][] interface. +* Processors should call `processors.AddStreaming` in their `init` function to register + themselves. See below for a quick example. + +### Streaming Processor Example + +```go +package printer + +// printer.go + +import ( + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Printer struct { +} + +var sampleConfig = ` +` + +func (p *Printer) SampleConfig() string { + return sampleConfig +} + +func (p *Printer) Description() string { + return "Print all metrics that pass through this filter." +} + +func (p *Printer) Init() error { + return nil +} + +func (p *Printer) Start(acc telegraf.Accumulator) error { +} + +func (p *Printer) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { + // print! + fmt.Println(metric.String()) + + // pass the metric downstream, or metric.Drop() it. + // Metric will be dropped if this function returns an error. + acc.AddMetric(metric) + + return nil +} + +func (p *Printer) Stop() error { +} + +func init() { + processors.AddStreaming("printer", func() telegraf.StreamingProcessor { + return &Printer{} + }) +} +``` + [SampleConfig]: https://github.com/influxdata/telegraf/wiki/SampleConfig [CodeStyle]: https://github.com/influxdata/telegraf/wiki/CodeStyle [telegraf.Processor]: https://godoc.org/github.com/influxdata/telegraf#Processor +[telegraf.StreamingProcessor]: https://godoc.org/github.com/influxdata/telegraf#StreamingProcessor diff --git a/internal/process/process.go b/internal/process/process.go index 371c2cd709768..b7fd77b92d578 100644 --- a/internal/process/process.go +++ b/internal/process/process.go @@ -2,13 +2,15 @@ package process import ( "context" + "errors" "fmt" "io" "io/ioutil" - "log" "os/exec" "sync" "time" + + "github.com/influxdata/telegraf" ) // Process is a long-running process manager that will restart processes if they stop. @@ -20,6 +22,7 @@ type Process struct { ReadStdoutFn func(io.Reader) ReadStderrFn func(io.Reader) RestartDelay time.Duration + Log telegraf.Logger cancel context.CancelFunc mainLoopWg sync.WaitGroup @@ -27,6 +30,10 @@ type Process struct { // New creates a new process wrapper func New(command []string) (*Process, error) { + if len(command) == 0 { + return nil, errors.New("no command") + } + p := &Process{ RestartDelay: 5 * time.Second, } @@ -56,8 +63,6 @@ func New(command []string) (*Process, error) { // Start the process func (p *Process) Start() error { - p.mainLoopWg.Add(1) - ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel @@ -65,9 +70,10 @@ func (p *Process) Start() error { return err } + p.mainLoopWg.Add(1) go func() { if err := p.cmdLoop(ctx); err != nil { - log.Printf("E! [agent] Process quit with message: %v", err) + p.Log.Errorf("Process quit with message: %v", err) } p.mainLoopWg.Done() }() @@ -83,10 +89,10 @@ func (p *Process) Stop() { } func (p *Process) cmdStart() error { - log.Printf("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args) + p.Log.Infof("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args) if err := p.Cmd.Start(); err != nil { - return fmt.Errorf("Error starting process: %s", err) + return fmt.Errorf("error starting process: %s", err) } return nil @@ -105,12 +111,12 @@ func (p *Process) cmdLoop(ctx context.Context) error { for { err := p.cmdWait() if isQuitting(ctx) { - log.Printf("Process %s shut down", p.Cmd.Path) + p.Log.Infof("Process %s shut down", p.Cmd.Path) return nil } - log.Printf("Process %s terminated: %v", p.Cmd.Path, err) - log.Printf("Restarting in %s...", time.Duration(p.RestartDelay)) + p.Log.Errorf("Process %s exited: %v", p.Cmd.Path, err) + p.Log.Infof("Restarting in %s...", time.Duration(p.RestartDelay)) select { case <-ctx.Done(): diff --git a/internal/process/process_posix.go b/internal/process/process_posix.go index ab834242272f5..f459e00e2fa6c 100644 --- a/internal/process/process_posix.go +++ b/internal/process/process_posix.go @@ -10,13 +10,13 @@ import ( func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { time.AfterFunc(timeout, func() { - if cmd == nil || cmd.ProcessState == nil { + if cmd.ProcessState == nil { return } if !cmd.ProcessState.Exited() { cmd.Process.Signal(syscall.SIGTERM) time.AfterFunc(timeout, func() { - if cmd == nil || cmd.ProcessState == nil { + if cmd.ProcessState == nil { return } if !cmd.ProcessState.Exited() { diff --git a/internal/process/process_windows.go b/internal/process/process_windows.go index 55b78f8813df8..fc110841561f9 100644 --- a/internal/process/process_windows.go +++ b/internal/process/process_windows.go @@ -9,7 +9,7 @@ import ( func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { time.AfterFunc(timeout, func() { - if cmd == nil || cmd.ProcessState == nil { + if cmd.ProcessState == nil { return } if !cmd.ProcessState.Exited() { diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index f44f7648e047e..00479cb3e304f 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -2,9 +2,9 @@ package execd import ( "bufio" + "errors" "fmt" "io" - "log" "time" "github.com/influxdata/telegraf" @@ -43,6 +43,7 @@ type Execd struct { Command []string Signal string RestartDelay config.Duration + Log telegraf.Logger process *process.Process acc telegraf.Accumulator @@ -63,16 +64,12 @@ func (e *Execd) SetParser(parser parsers.Parser) { func (e *Execd) Start(acc telegraf.Accumulator) error { e.acc = acc - if len(e.Command) == 0 { - return fmt.Errorf("FATAL no command specified") - } - var err error e.process, err = process.New(e.Command) if err != nil { - return fmt.Errorf("Error creating new process: %w", err) + return fmt.Errorf("error creating new process: %w", err) } - + e.process.Log = e.Log e.process.RestartDelay = time.Duration(e.RestartDelay) e.process.ReadStdoutFn = e.cmdReadOut e.process.ReadStderrFn = e.cmdReadErr @@ -100,7 +97,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { for scanner.Scan() { metrics, err := e.parser.Parse(scanner.Bytes()) if err != nil { - e.acc.AddError(fmt.Errorf("Parse error: %s", err)) + e.acc.AddError(fmt.Errorf("parse error: %w", err)) } for _, metric := range metrics { @@ -109,7 +106,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { } if err := scanner.Err(); err != nil { - e.acc.AddError(fmt.Errorf("Error reading stdout: %s", err)) + e.acc.AddError(fmt.Errorf("error reading stdout: %w", err)) } } @@ -140,14 +137,21 @@ func (e *Execd) cmdReadErr(out io.Reader) { scanner := bufio.NewScanner(out) for scanner.Scan() { - log.Printf("[inputs.execd] stderr: %q", scanner.Text()) + e.Log.Errorf("stderr: %q", scanner.Text()) } if err := scanner.Err(); err != nil { - e.acc.AddError(fmt.Errorf("Error reading stderr: %s", err)) + e.acc.AddError(fmt.Errorf("error reading stderr: %w", err)) } } +func (e *Execd) Init() error { + if len(e.Command) == 0 { + return errors.New("no command specified") + } + return nil +} + func init() { inputs.Add("execd", func() telegraf.Input { return &Execd{ diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index 1cdbfdc5f6d20..ce046568c6dcf 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/models" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/parsers" @@ -28,6 +29,7 @@ func TestExternalInputWorks(t *testing.T) { parser: influxParser, Signal: "STDIN", } + e.Log = testutil.Logger{} metrics := make(chan telegraf.Metric, 10) defer close(metrics) @@ -64,6 +66,7 @@ func TestParsesLinesContainingNewline(t *testing.T) { Signal: "STDIN", acc: acc, } + e.Log = testutil.Logger{} cases := []struct { Name string diff --git a/plugins/processors/execd/README.md b/plugins/processors/execd/README.md index 5e779521a6c5e..f1fdb0b85f9ff 100644 --- a/plugins/processors/execd/README.md +++ b/plugins/processors/execd/README.md @@ -23,7 +23,8 @@ Program output on standard error is mirrored to the telegraf log. ```toml [[processor.execd]] ## Program to run as daemon - command = ["/path/to/your_program", "arg1", "arg2"] + ## eg: command = ["/path/to/your_program", "arg1", "arg2"] + command = ["cat"] ## Delay before the process is restarted after an unexpected termination # restart_delay = "10s" diff --git a/plugins/processors/execd/execd.go b/plugins/processors/execd/execd.go index f2026fd0e203b..5e4bbc53f0274 100644 --- a/plugins/processors/execd/execd.go +++ b/plugins/processors/execd/execd.go @@ -2,9 +2,9 @@ package execd import ( "bufio" + "errors" "fmt" "io" - "log" "time" "github.com/influxdata/telegraf" @@ -16,8 +16,9 @@ import ( ) const sampleConfig = ` - ## Program to run as daemon - command = ["telegraf-smartctl", "-d", "/dev/sda"] + ## Program to run as daemon + ## eg: command = ["/path/to/your_program", "arg1", "arg2"] + command = ["cat"] ## Delay before the process is restarted after an unexpected termination restart_delay = "10s" @@ -26,6 +27,7 @@ const sampleConfig = ` type Execd struct { Command []string `toml:"command"` RestartDelay config.Duration `toml:"restart_delay"` + Log telegraf.Logger parserConfig *parsers.Config parser parsers.Parser @@ -67,15 +69,11 @@ func (e *Execd) Start(acc telegraf.Accumulator) error { } e.acc = acc - if len(e.Command) == 0 { - return fmt.Errorf("no command specified") - } - e.process, err = process.New(e.Command) if err != nil { return fmt.Errorf("error creating new process: %w", err) } - + e.process.Log = e.Log e.process.RestartDelay = time.Duration(e.RestartDelay) e.process.ReadStdoutFn = e.cmdReadOut e.process.ReadStderrFn = e.cmdReadErr @@ -116,7 +114,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { for scanner.Scan() { metrics, err := e.parser.Parse(scanner.Bytes()) if err != nil { - log.Println(fmt.Errorf("Parse error: %s", err)) + e.Log.Errorf("Parse error: %s", err) } for _, metric := range metrics { @@ -125,7 +123,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { } if err := scanner.Err(); err != nil { - log.Println(fmt.Errorf("Error reading stdout: %s", err)) + e.Log.Errorf("Error reading stdout: %s", err) } } @@ -133,14 +131,21 @@ func (e *Execd) cmdReadErr(out io.Reader) { scanner := bufio.NewScanner(out) for scanner.Scan() { - log.Printf("stderr: %q", scanner.Text()) + e.Log.Errorf("stderr: %q", scanner.Text()) } if err := scanner.Err(); err != nil { - log.Println(fmt.Errorf("Error reading stderr: %s", err)) + e.Log.Errorf("Error reading stderr: %s", err) } } +func (e *Execd) Init() error { + if len(e.Command) == 0 { + return errors.New("no command specified") + } + return nil +} + func init() { processors.AddStreaming("execd", func() telegraf.StreamingProcessor { return New() diff --git a/plugins/processors/execd/execd_test.go b/plugins/processors/execd/execd_test.go index d25dece64b5e2..669d6601ccfee 100644 --- a/plugins/processors/execd/execd_test.go +++ b/plugins/processors/execd/execd_test.go @@ -18,6 +18,8 @@ import ( func TestExternalProcessorWorks(t *testing.T) { e := New() + e.Log = testutil.Logger{} + exe, err := os.Executable() require.NoError(t, err) t.Log(exe) @@ -29,6 +31,7 @@ func TestExternalProcessorWorks(t *testing.T) { require.NoError(t, e.Start(acc)) now := time.Now() + orig := now metrics := []telegraf.Metric{} for i := 0; i < 10; i++ { m, err := metric.New("test", @@ -52,19 +55,17 @@ func TestExternalProcessorWorks(t *testing.T) { require.NoError(t, e.Stop()) - require.Equal(t, "test", m.Name()) - - city, ok := m.Tags()["city"] - require.True(t, ok) - require.EqualValues(t, "Toronto", city) - - val, ok := m.Fields()["population"] - require.True(t, ok) - require.EqualValues(t, 6000000, val) - - val, ok = m.Fields()["count"] - require.True(t, ok) - require.EqualValues(t, 2, val) + expected := testutil.MustMetric("test", + map[string]string{ + "city": "Toronto", + }, + map[string]interface{}{ + "population": 6000000, + "count": 2, + }, + orig, + ) + testutil.RequireMetricEqual(t, expected, m) metricTime := m.Time().UnixNano() From 5c8666d1af0fe9e04cfbb112c4e3295251c86f59 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Fri, 26 Jun 2020 17:29:39 -0400 Subject: [PATCH 6/7] clean up tests --- plugins/inputs/syslog/rfc5426_test.go | 4 ---- plugins/processors/execd/execd_test.go | 7 +++---- testutil/accumulator.go | 10 ---------- 3 files changed, 3 insertions(+), 18 deletions(-) diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index 5e65c1d39d35a..f0720a7ac524e 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -235,10 +235,6 @@ func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool) require.NoError(t, receiver.Start(acc)) defer receiver.Stop() - // Clear - acc.ClearMetrics() - acc.Errors = make([]error, 0) - // Connect conn, err := net.Dial(protocol, address) require.NotNil(t, conn) diff --git a/plugins/processors/execd/execd_test.go b/plugins/processors/execd/execd_test.go index 669d6601ccfee..6f9c6b36b1639 100644 --- a/plugins/processors/execd/execd_test.go +++ b/plugins/processors/execd/execd_test.go @@ -51,10 +51,11 @@ func TestExternalProcessorWorks(t *testing.T) { } acc.Wait(1) - m := acc.GetTelegrafMetrics()[0] - require.NoError(t, e.Stop()) + metrics = acc.GetTelegrafMetrics() + m := metrics[0] + expected := testutil.MustMetric("test", map[string]string{ "city": "Toronto", @@ -70,8 +71,6 @@ func TestExternalProcessorWorks(t *testing.T) { metricTime := m.Time().UnixNano() // read the other 9 and make sure they're ordered properly - acc.Wait(9) - metrics = acc.GetTelegrafMetrics() for i := 0; i < 9; i++ { m = metrics[i+1] require.EqualValues(t, metricTime+1, m.Time().UnixNano()) diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 870eb9e04af8f..b05fd464c7260 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -55,8 +55,6 @@ func (a *Accumulator) NMetrics() uint64 { } func (a *Accumulator) GetTelegrafMetrics() []telegraf.Metric { - a.Lock() - defer a.Unlock() metrics := []telegraf.Metric{} for _, m := range a.Metrics { metrics = append(metrics, FromTestMetric(m)) @@ -253,8 +251,6 @@ func (a *Accumulator) SetDebug(debug bool) { // Get gets the specified measurement point from the accumulator func (a *Accumulator) Get(measurement string) (*Metric, bool) { - a.Lock() - defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { return p, true @@ -265,8 +261,6 @@ func (a *Accumulator) Get(measurement string) (*Metric, bool) { } func (a *Accumulator) HasTag(measurement string, key string) bool { - a.Lock() - defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { _, ok := p.Tags[key] @@ -277,8 +271,6 @@ func (a *Accumulator) HasTag(measurement string, key string) bool { } func (a *Accumulator) TagSetValue(measurement string, key string) string { - a.Lock() - defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { v, ok := p.Tags[key] @@ -291,8 +283,6 @@ func (a *Accumulator) TagSetValue(measurement string, key string) string { } func (a *Accumulator) TagValue(measurement string, key string) string { - a.Lock() - defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { v, ok := p.Tags[key] From 01167e585fcdc0e41e639e3df4c55ad04254d474 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Fri, 26 Jun 2020 17:37:01 -0400 Subject: [PATCH 7/7] remove streaming processors docs --- docs/PROCESSORS.md | 75 ---------------------------------------------- 1 file changed, 75 deletions(-) diff --git a/docs/PROCESSORS.md b/docs/PROCESSORS.md index 45a4eb2777c3e..6ea82fdae3309 100644 --- a/docs/PROCESSORS.md +++ b/docs/PROCESSORS.md @@ -64,81 +64,6 @@ func init() { } ``` -### Streaming Processors - -Streaming processors are a new processor type available to you. They are -particularly useful to implement processor types that use background processes -or goroutines to process multiple metrics at the same time. Some examples of this -are the execd processor, which pipes metrics out to an external process over stdin -and reads them back over stdout, and the reverse_dns processor, which does reverse -dns lookups on IP addresses in fields. While both of these come with a speed cost, -it would be significantly worse if you had to process one metric completely from -start to finish before handling the next metric, and thus they benefit -significantly from a streaming-pipe approach. - -Some differences from classic Processors: - -* Streaming processors must conform to the [telegraf.StreamingProcessor][] interface. -* Processors should call `processors.AddStreaming` in their `init` function to register - themselves. See below for a quick example. - -### Streaming Processor Example - -```go -package printer - -// printer.go - -import ( - "fmt" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/processors" -) - -type Printer struct { -} - -var sampleConfig = ` -` - -func (p *Printer) SampleConfig() string { - return sampleConfig -} - -func (p *Printer) Description() string { - return "Print all metrics that pass through this filter." -} - -func (p *Printer) Init() error { - return nil -} - -func (p *Printer) Start(acc telegraf.Accumulator) error { -} - -func (p *Printer) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { - // print! - fmt.Println(metric.String()) - - // pass the metric downstream, or metric.Drop() it. - // Metric will be dropped if this function returns an error. - acc.AddMetric(metric) - - return nil -} - -func (p *Printer) Stop() error { -} - -func init() { - processors.AddStreaming("printer", func() telegraf.StreamingProcessor { - return &Printer{} - }) -} -``` - [SampleConfig]: https://github.com/influxdata/telegraf/wiki/SampleConfig [CodeStyle]: https://github.com/influxdata/telegraf/wiki/CodeStyle [telegraf.Processor]: https://godoc.org/github.com/influxdata/telegraf#Processor -[telegraf.StreamingProcessor]: https://godoc.org/github.com/influxdata/telegraf#StreamingProcessor