diff --git a/agent/agent.go b/agent/agent.go index a48ed590af228..0ff5893785293 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -510,6 +510,7 @@ func (a *Agent) runProcessors( for m := range unit.src { if err := unit.processor.Add(m, acc); err != nil { acc.AddError(err) + m.Drop() } } unit.processor.Stop() diff --git a/internal/process/process.go b/internal/process/process.go new file mode 100644 index 0000000000000..b7fd77b92d578 --- /dev/null +++ b/internal/process/process.go @@ -0,0 +1,165 @@ +package process + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "os/exec" + "sync" + "time" + + "github.com/influxdata/telegraf" +) + +// Process is a long-running process manager that will restart processes if they stop. +type Process struct { + Cmd *exec.Cmd + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser + ReadStdoutFn func(io.Reader) + ReadStderrFn func(io.Reader) + RestartDelay time.Duration + Log telegraf.Logger + + cancel context.CancelFunc + mainLoopWg sync.WaitGroup +} + +// New creates a new process wrapper +func New(command []string) (*Process, error) { + if len(command) == 0 { + return nil, errors.New("no command") + } + + p := &Process{ + RestartDelay: 5 * time.Second, + } + if len(command) > 1 { + p.Cmd = exec.Command(command[0], command[1:]...) + } else { + p.Cmd = exec.Command(command[0]) + } + var err error + p.Stdin, err = p.Cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("error opening stdin pipe: %w", err) + } + + p.Stdout, err = p.Cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("error opening stdout pipe: %w", err) + } + + p.Stderr, err = p.Cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("error opening stderr pipe: %w", err) + } + + return p, nil +} + +// Start the process +func (p *Process) Start() error { + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + + if err := p.cmdStart(); err != nil { + return err + } + + p.mainLoopWg.Add(1) + go func() { + if err := p.cmdLoop(ctx); err != nil { + p.Log.Errorf("Process quit with message: %v", err) + } + p.mainLoopWg.Done() + }() + + return nil +} + +func (p *Process) Stop() { + if p.cancel != nil { + p.cancel() + } + p.mainLoopWg.Wait() +} + +func (p *Process) cmdStart() error { + p.Log.Infof("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args) + + if err := p.Cmd.Start(); err != nil { + return fmt.Errorf("error starting process: %s", err) + } + + return nil +} + +// cmdLoop watches an already running process, restarting it when appropriate. +func (p *Process) cmdLoop(ctx context.Context) error { + go func() { + <-ctx.Done() + if p.Stdin != nil { + p.Stdin.Close() + gracefulStop(p.Cmd, 5*time.Second) + } + }() + + for { + err := p.cmdWait() + if isQuitting(ctx) { + p.Log.Infof("Process %s shut down", p.Cmd.Path) + return nil + } + + p.Log.Errorf("Process %s exited: %v", p.Cmd.Path, err) + p.Log.Infof("Restarting in %s...", time.Duration(p.RestartDelay)) + + select { + case <-ctx.Done(): + return nil + case <-time.After(time.Duration(p.RestartDelay)): + // Continue the loop and restart the process + if err := p.cmdStart(); err != nil { + return err + } + } + } +} + +func (p *Process) cmdWait() error { + var wg sync.WaitGroup + + if p.ReadStdoutFn == nil { + p.ReadStdoutFn = defaultReadPipe + } + if p.ReadStderrFn == nil { + p.ReadStderrFn = defaultReadPipe + } + + wg.Add(1) + go func() { + p.ReadStdoutFn(p.Stdout) + wg.Done() + }() + + wg.Add(1) + go func() { + p.ReadStderrFn(p.Stderr) + wg.Done() + }() + + wg.Wait() + return p.Cmd.Wait() +} + +func isQuitting(ctx context.Context) bool { + return ctx.Err() != nil +} + +func defaultReadPipe(r io.Reader) { + io.Copy(ioutil.Discard, r) +} diff --git a/internal/process/process_posix.go b/internal/process/process_posix.go new file mode 100644 index 0000000000000..f459e00e2fa6c --- /dev/null +++ b/internal/process/process_posix.go @@ -0,0 +1,28 @@ +// +build !windows + +package process + +import ( + "os/exec" + "syscall" + "time" +) + +func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { + time.AfterFunc(timeout, func() { + if cmd.ProcessState == nil { + return + } + if !cmd.ProcessState.Exited() { + cmd.Process.Signal(syscall.SIGTERM) + time.AfterFunc(timeout, func() { + if cmd.ProcessState == nil { + return + } + if !cmd.ProcessState.Exited() { + cmd.Process.Kill() + } + }) + } + }) +} diff --git a/internal/process/process_windows.go b/internal/process/process_windows.go new file mode 100644 index 0000000000000..fc110841561f9 --- /dev/null +++ b/internal/process/process_windows.go @@ -0,0 +1,19 @@ +// +build windows + +package process + +import ( + "os/exec" + "time" +) + +func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { + time.AfterFunc(timeout, func() { + if cmd.ProcessState == nil { + return + } + if !cmd.ProcessState.Exited() { + cmd.Process.Kill() + } + }) +} diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index ca9e589d95fde..00479cb3e304f 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -2,16 +2,14 @@ package execd import ( "bufio" - "context" + "errors" "fmt" "io" - "log" - "os/exec" - "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/process" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/influx" @@ -45,15 +43,11 @@ type Execd struct { Command []string Signal string RestartDelay config.Duration + Log telegraf.Logger - acc telegraf.Accumulator - cmd *exec.Cmd - parser parsers.Parser - stdin io.WriteCloser - stdout io.ReadCloser - stderr io.ReadCloser - cancel context.CancelFunc - mainLoopWg sync.WaitGroup + process *process.Process + acc telegraf.Accumulator + parser parsers.Parser } func (e *Execd) SampleConfig() string { @@ -70,131 +64,25 @@ func (e *Execd) SetParser(parser parsers.Parser) { func (e *Execd) Start(acc telegraf.Accumulator) error { e.acc = acc - - if len(e.Command) == 0 { - return fmt.Errorf("FATAL no command specified") - } - - e.mainLoopWg.Add(1) - - ctx, cancel := context.WithCancel(context.Background()) - e.cancel = cancel - - if err := e.cmdStart(); err != nil { - return err - } - - go func() { - if err := e.cmdLoop(ctx); err != nil { - log.Printf("Process quit with message: %s", err.Error()) - } - e.mainLoopWg.Done() - }() - - return nil -} - -func (e *Execd) Stop() { - // don't try to stop before all stream readers have started. - e.cancel() - e.mainLoopWg.Wait() -} - -// cmdLoop watches an already running process, restarting it when appropriate. -func (e *Execd) cmdLoop(ctx context.Context) error { - for { - // Use a buffered channel to ensure goroutine below can exit - // if `ctx.Done` is selected and nothing reads on `done` anymore - done := make(chan error, 1) - go func() { - done <- e.cmdWait() - }() - - select { - case <-ctx.Done(): - if e.stdin != nil { - e.stdin.Close() - gracefulStop(e.cmd, 5*time.Second) - } - return nil - case err := <-done: - log.Printf("Process %s terminated: %s", e.Command, err) - if isQuitting(ctx) { - return err - } - } - - log.Printf("Restarting in %s...", time.Duration(e.RestartDelay)) - - select { - case <-ctx.Done(): - return nil - case <-time.After(time.Duration(e.RestartDelay)): - // Continue the loop and restart the process - if err := e.cmdStart(); err != nil { - return err - } - } - } -} - -func isQuitting(ctx context.Context) bool { - select { - case <-ctx.Done(): - return true - default: - return false - } -} - -func (e *Execd) cmdStart() (err error) { - if len(e.Command) > 1 { - e.cmd = exec.Command(e.Command[0], e.Command[1:]...) - } else { - e.cmd = exec.Command(e.Command[0]) - } - - e.stdin, err = e.cmd.StdinPipe() - if err != nil { - return fmt.Errorf("Error opening stdin pipe: %s", err) - } - - e.stdout, err = e.cmd.StdoutPipe() + var err error + e.process, err = process.New(e.Command) if err != nil { - return fmt.Errorf("Error opening stdout pipe: %s", err) + return fmt.Errorf("error creating new process: %w", err) } + e.process.Log = e.Log + e.process.RestartDelay = time.Duration(e.RestartDelay) + e.process.ReadStdoutFn = e.cmdReadOut + e.process.ReadStderrFn = e.cmdReadErr - e.stderr, err = e.cmd.StderrPipe() - if err != nil { - return fmt.Errorf("Error opening stderr pipe: %s", err) - } - - log.Printf("Starting process: %s", e.Command) - - err = e.cmd.Start() - if err != nil { - return fmt.Errorf("Error starting process: %s", err) + if err = e.process.Start(); err != nil { + return fmt.Errorf("failed to start process %s: %w", e.Command, err) } return nil } -func (e *Execd) cmdWait() error { - var wg sync.WaitGroup - wg.Add(2) - - go func() { - e.cmdReadOut(e.stdout) - wg.Done() - }() - - go func() { - e.cmdReadErr(e.stderr) - wg.Done() - }() - - wg.Wait() - return e.cmd.Wait() +func (e *Execd) Stop() { + e.process.Stop() } func (e *Execd) cmdReadOut(out io.Reader) { @@ -209,7 +97,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { for scanner.Scan() { metrics, err := e.parser.Parse(scanner.Bytes()) if err != nil { - e.acc.AddError(fmt.Errorf("Parse error: %s", err)) + e.acc.AddError(fmt.Errorf("parse error: %w", err)) } for _, metric := range metrics { @@ -218,7 +106,7 @@ func (e *Execd) cmdReadOut(out io.Reader) { } if err := scanner.Err(); err != nil { - e.acc.AddError(fmt.Errorf("Error reading stdout: %s", err)) + e.acc.AddError(fmt.Errorf("error reading stdout: %w", err)) } } @@ -249,12 +137,19 @@ func (e *Execd) cmdReadErr(out io.Reader) { scanner := bufio.NewScanner(out) for scanner.Scan() { - log.Printf("stderr: %q", scanner.Text()) + e.Log.Errorf("stderr: %q", scanner.Text()) } if err := scanner.Err(); err != nil { - e.acc.AddError(fmt.Errorf("Error reading stderr: %s", err)) + e.acc.AddError(fmt.Errorf("error reading stderr: %w", err)) + } +} + +func (e *Execd) Init() error { + if len(e.Command) == 0 { + return errors.New("no command specified") } + return nil } func init() { diff --git a/plugins/inputs/execd/execd_posix.go b/plugins/inputs/execd/execd_posix.go index cc3a8e8bb5aac..4d8789a8d3215 100644 --- a/plugins/inputs/execd/execd_posix.go +++ b/plugins/inputs/execd/execd_posix.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "os" - "os/exec" "syscall" "time" @@ -14,22 +13,26 @@ import ( ) func (e *Execd) Gather(acc telegraf.Accumulator) error { - if e.cmd == nil || e.cmd.Process == nil { + if e.process == nil || e.process.Cmd == nil { return nil } + osProcess := e.process.Cmd.Process + if osProcess == nil { + return nil + } switch e.Signal { case "SIGHUP": - e.cmd.Process.Signal(syscall.SIGHUP) + osProcess.Signal(syscall.SIGHUP) case "SIGUSR1": - e.cmd.Process.Signal(syscall.SIGUSR1) + osProcess.Signal(syscall.SIGUSR1) case "SIGUSR2": - e.cmd.Process.Signal(syscall.SIGUSR2) + osProcess.Signal(syscall.SIGUSR2) case "STDIN": - if osStdin, ok := e.stdin.(*os.File); ok { + if osStdin, ok := e.process.Stdin.(*os.File); ok { osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second)) } - if _, err := io.WriteString(e.stdin, "\n"); err != nil { + if _, err := io.WriteString(e.process.Stdin, "\n"); err != nil { return fmt.Errorf("Error writing to stdin: %s", err) } case "none": @@ -39,11 +42,3 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error { return nil } - -func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { - cmd.Process.Signal(syscall.SIGTERM) - go func() { - <-time.NewTimer(timeout).C - cmd.Process.Kill() - }() -} diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index 52c0a214b6ea6..ce046568c6dcf 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/models" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/parsers" @@ -19,15 +20,16 @@ import ( ) func TestExternalInputWorks(t *testing.T) { - jsonParser, err := parsers.NewInfluxParser() + influxParser, err := parsers.NewInfluxParser() require.NoError(t, err) e := &Execd{ Command: []string{shell(), fileShellScriptPath()}, RestartDelay: config.Duration(5 * time.Second), - parser: jsonParser, + parser: influxParser, Signal: "STDIN", } + e.Log = testutil.Logger{} metrics := make(chan telegraf.Metric, 10) defer close(metrics) @@ -64,6 +66,7 @@ func TestParsesLinesContainingNewline(t *testing.T) { Signal: "STDIN", acc: acc, } + e.Log = testutil.Logger{} cases := []struct { Name string diff --git a/plugins/inputs/execd/execd_windows.go b/plugins/inputs/execd/execd_windows.go index 82935d4ac87c3..15e6798f2389b 100644 --- a/plugins/inputs/execd/execd_windows.go +++ b/plugins/inputs/execd/execd_windows.go @@ -6,23 +6,22 @@ import ( "fmt" "io" "os" - "os/exec" "time" "github.com/influxdata/telegraf" ) func (e *Execd) Gather(acc telegraf.Accumulator) error { - if e.cmd == nil || e.cmd.Process == nil { + if e.process == nil { return nil } switch e.Signal { case "STDIN": - if osStdin, ok := e.stdin.(*os.File); ok { + if osStdin, ok := e.process.Stdin.(*os.File); ok { osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second)) } - if _, err := io.WriteString(e.stdin, "\n"); err != nil { + if _, err := io.WriteString(e.process.Stdin, "\n"); err != nil { return fmt.Errorf("Error writing to stdin: %s", err) } case "none": @@ -32,7 +31,3 @@ func (e *Execd) Gather(acc telegraf.Accumulator) error { return nil } - -func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { - cmd.Process.Kill() -} diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index 5e65c1d39d35a..f0720a7ac524e 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -235,10 +235,6 @@ func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool) require.NoError(t, receiver.Start(acc)) defer receiver.Stop() - // Clear - acc.ClearMetrics() - acc.Errors = make([]error, 0) - // Connect conn, err := net.Dial(protocol, address) require.NotNil(t, conn) diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index 6dc2e2b0d6589..ad8d8616c2ba1 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/dedup" _ "github.com/influxdata/telegraf/plugins/processors/defaults" _ "github.com/influxdata/telegraf/plugins/processors/enum" + _ "github.com/influxdata/telegraf/plugins/processors/execd" _ "github.com/influxdata/telegraf/plugins/processors/filepath" _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/parser" diff --git a/plugins/processors/execd/README.md b/plugins/processors/execd/README.md new file mode 100644 index 0000000000000..f1fdb0b85f9ff --- /dev/null +++ b/plugins/processors/execd/README.md @@ -0,0 +1,110 @@ +# Execd Processor Plugin + +The `execd` processor plugin runs an external program as a separate process and +pipes metrics in to the process's STDIN and reads processed metrics from its STDOUT. +The programs must accept influx line protocol on standard in (STDIN) and output +metrics in influx line protocol to standard output (STDOUT). + +Program output on standard error is mirrored to the telegraf log. + +### Caveats + +- Metrics with tracking will be considered "delivered" as soon as they are passed + to the external process. There is currently no way to match up which metric + coming out of the execd process relates to which metric going in (keep in mind + that processors can add and drop metrics, and that this is all done + asynchronously). +- it's not currently possible to use a data_format other than "influx", due to + the requirement that it is serialize-parse symmetrical and does not lose any + critical type data. + +### Configuration: + +```toml +[[processor.execd]] + ## Program to run as daemon + ## eg: command = ["/path/to/your_program", "arg1", "arg2"] + command = ["cat"] + + ## Delay before the process is restarted after an unexpected termination + # restart_delay = "10s" +``` + +### Example + +#### Go daemon example + +This go daemon reads a metric from stdin, multiplies the "count" field by 2, +and writes the metric back out. + +```go +package main + +import ( + "fmt" + "os" + + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/serializers" +) + +func main() { + parser := influx.NewStreamParser(os.Stdin) + serializer, _ := serializers.NewInfluxSerializer() + + for { + metric, err := parser.Next() + if err != nil { + if err == influx.EOF { + return // stream ended + } + if parseErr, isParseError := err.(*influx.ParseError); isParseError { + fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) + os.Exit(1) + } + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + os.Exit(1) + } + + c, found := metric.GetField("count") + if !found { + fmt.Fprintf(os.Stderr, "metric has no count field\n") + os.Exit(1) + } + switch t := c.(type) { + case float64: + t *= 2 + metric.AddField("count", t) + case int64: + t *= 2 + metric.AddField("count", t) + default: + fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c) + os.Exit(1) + } + b, err := serializer.Serialize(metric) + if err != nil { + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + os.Exit(1) + } + fmt.Fprint(os.Stdout, string(b)) + } +} +``` + +to run it, you'd build the binary using go, eg `go build -o multiplier.exe main.go` + +```toml +[[processors.execd]] + command = ["multiplier.exe"] +``` + +#### Ruby daemon using SIGHUP + +- See [Ruby daemon](./examples/multiplier_line_protocol/multiplier_line_protocol.rb) + +```toml +[[processors.execd]] + command = ["ruby", "plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb"] +``` diff --git a/plugins/processors/execd/examples/multiplier_line_protocol/multiplier.conf b/plugins/processors/execd/examples/multiplier_line_protocol/multiplier.conf new file mode 100644 index 0000000000000..120e04cbb73f1 --- /dev/null +++ b/plugins/processors/execd/examples/multiplier_line_protocol/multiplier.conf @@ -0,0 +1,14 @@ +[agent] + interval = "10s" + +[[inputs.execd]] + command = ["ruby", "plugins/inputs/execd/examples/count.rb"] + +[[processors.execd]] + command = ["ruby", "plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb"] + + +[[outputs.file]] + files = ["stdout"] + data_format = "influx" + diff --git a/plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb b/plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb new file mode 100644 index 0000000000000..6949a310e8754 --- /dev/null +++ b/plugins/processors/execd/examples/multiplier_line_protocol/multiplier_line_protocol.rb @@ -0,0 +1,27 @@ +#!/usr/bin/env ruby + +loop do + # example input: "counter_ruby count=0 1586302128978187000" + line = STDIN.readline.chomp + # parse out influx line protocol sections with a really simple hand-rolled parser that doesn't support escaping. + # for a full line parser in ruby, check out something like the influxdb-lineprotocol-parser gem. + parts = line.split(" ") + case parts.size + when 3 + measurement, fields, timestamp = parts + when 4 + measurement, tags, fields, timestamp = parts + else + STDERR.puts "Unable to parse line protocol" + exit 1 + end + fields = fields.split(",").map{|t| + k,v = t.split("=") + if k == "count" + v = v.to_i * 2 # multiple count metric by two + end + "#{k}=#{v}" + }.join(",") + puts [measurement, tags, fields, timestamp].select{|s| s && s.size != 0 }.join(" ") + STDOUT.flush +end diff --git a/plugins/processors/execd/execd.go b/plugins/processors/execd/execd.go new file mode 100644 index 0000000000000..5e4bbc53f0274 --- /dev/null +++ b/plugins/processors/execd/execd.go @@ -0,0 +1,153 @@ +package execd + +import ( + "bufio" + "errors" + "fmt" + "io" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/process" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/processors" + "github.com/influxdata/telegraf/plugins/serializers" +) + +const sampleConfig = ` + ## Program to run as daemon + ## eg: command = ["/path/to/your_program", "arg1", "arg2"] + command = ["cat"] + + ## Delay before the process is restarted after an unexpected termination + restart_delay = "10s" +` + +type Execd struct { + Command []string `toml:"command"` + RestartDelay config.Duration `toml:"restart_delay"` + Log telegraf.Logger + + parserConfig *parsers.Config + parser parsers.Parser + serializerConfig *serializers.Config + serializer serializers.Serializer + acc telegraf.Accumulator + process *process.Process +} + +func New() *Execd { + return &Execd{ + RestartDelay: config.Duration(10 * time.Second), + parserConfig: &parsers.Config{ + DataFormat: "influx", + }, + serializerConfig: &serializers.Config{ + DataFormat: "influx", + }, + } +} + +func (e *Execd) SampleConfig() string { + return sampleConfig +} + +func (e *Execd) Description() string { + return "Run executable as long-running processor plugin" +} + +func (e *Execd) Start(acc telegraf.Accumulator) error { + var err error + e.parser, err = parsers.NewParser(e.parserConfig) + if err != nil { + return fmt.Errorf("error creating parser: %w", err) + } + e.serializer, err = serializers.NewSerializer(e.serializerConfig) + if err != nil { + return fmt.Errorf("error creating serializer: %w", err) + } + e.acc = acc + + e.process, err = process.New(e.Command) + if err != nil { + return fmt.Errorf("error creating new process: %w", err) + } + e.process.Log = e.Log + e.process.RestartDelay = time.Duration(e.RestartDelay) + e.process.ReadStdoutFn = e.cmdReadOut + e.process.ReadStderrFn = e.cmdReadErr + + if err = e.process.Start(); err != nil { + return fmt.Errorf("failed to start process %s: %w", e.Command, err) + } + + return nil +} + +func (e *Execd) Add(m telegraf.Metric, acc telegraf.Accumulator) error { + b, err := e.serializer.Serialize(m) + if err != nil { + return fmt.Errorf("metric serializing error: %w", err) + } + + _, err = e.process.Stdin.Write(b) + if err != nil { + return fmt.Errorf("error writing to process stdin: %w", err) + } + + // We cannot maintain tracking metrics at the moment because input/output + // is done asynchronously and we don't have any metric metadata to tie the + // output metric back to the original input metric. + m.Drop() + return nil +} + +func (e *Execd) Stop() error { + e.process.Stop() + return nil +} + +func (e *Execd) cmdReadOut(out io.Reader) { + scanner := bufio.NewScanner(out) + + for scanner.Scan() { + metrics, err := e.parser.Parse(scanner.Bytes()) + if err != nil { + e.Log.Errorf("Parse error: %s", err) + } + + for _, metric := range metrics { + e.acc.AddMetric(metric) + } + } + + if err := scanner.Err(); err != nil { + e.Log.Errorf("Error reading stdout: %s", err) + } +} + +func (e *Execd) cmdReadErr(out io.Reader) { + scanner := bufio.NewScanner(out) + + for scanner.Scan() { + e.Log.Errorf("stderr: %q", scanner.Text()) + } + + if err := scanner.Err(); err != nil { + e.Log.Errorf("Error reading stderr: %s", err) + } +} + +func (e *Execd) Init() error { + if len(e.Command) == 0 { + return errors.New("no command specified") + } + return nil +} + +func init() { + processors.AddStreaming("execd", func() telegraf.StreamingProcessor { + return New() + }) +} diff --git a/plugins/processors/execd/execd_test.go b/plugins/processors/execd/execd_test.go new file mode 100644 index 0000000000000..6f9c6b36b1639 --- /dev/null +++ b/plugins/processors/execd/execd_test.go @@ -0,0 +1,135 @@ +package execd + +import ( + "flag" + "fmt" + "os" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestExternalProcessorWorks(t *testing.T) { + e := New() + e.Log = testutil.Logger{} + + exe, err := os.Executable() + require.NoError(t, err) + t.Log(exe) + e.Command = []string{exe, "-countmultiplier"} + e.RestartDelay = config.Duration(5 * time.Second) + + acc := &testutil.Accumulator{} + + require.NoError(t, e.Start(acc)) + + now := time.Now() + orig := now + metrics := []telegraf.Metric{} + for i := 0; i < 10; i++ { + m, err := metric.New("test", + map[string]string{ + "city": "Toronto", + }, + map[string]interface{}{ + "population": 6000000, + "count": 1, + }, + now) + require.NoError(t, err) + metrics = append(metrics, m) + now = now.Add(1) + + e.Add(m, acc) + } + + acc.Wait(1) + require.NoError(t, e.Stop()) + + metrics = acc.GetTelegrafMetrics() + m := metrics[0] + + expected := testutil.MustMetric("test", + map[string]string{ + "city": "Toronto", + }, + map[string]interface{}{ + "population": 6000000, + "count": 2, + }, + orig, + ) + testutil.RequireMetricEqual(t, expected, m) + + metricTime := m.Time().UnixNano() + + // read the other 9 and make sure they're ordered properly + for i := 0; i < 9; i++ { + m = metrics[i+1] + require.EqualValues(t, metricTime+1, m.Time().UnixNano()) + metricTime = m.Time().UnixNano() + } +} + +var countmultiplier = flag.Bool("countmultiplier", false, + "if true, act like line input program instead of test") + +func TestMain(m *testing.M) { + flag.Parse() + if *countmultiplier { + runCountMultiplierProgram() + os.Exit(0) + } + code := m.Run() + os.Exit(code) +} + +func runCountMultiplierProgram() { + parser := influx.NewStreamParser(os.Stdin) + serializer, _ := serializers.NewInfluxSerializer() + + for { + metric, err := parser.Next() + if err != nil { + if err == influx.EOF { + return // stream ended + } + if parseErr, isParseError := err.(*influx.ParseError); isParseError { + fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) + os.Exit(1) + } + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + os.Exit(1) + } + + c, found := metric.GetField("count") + if !found { + fmt.Fprintf(os.Stderr, "metric has no count field\n") + os.Exit(1) + } + switch t := c.(type) { + case float64: + t *= 2 + metric.AddField("count", t) + case int64: + t *= 2 + metric.AddField("count", t) + default: + fmt.Fprintf(os.Stderr, "count is not an unknown type, it's a %T\n", c) + os.Exit(1) + } + b, err := serializer.Serialize(metric) + if err != nil { + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + os.Exit(1) + } + fmt.Fprint(os.Stdout, string(b)) + } +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 6e5148ef730f2..b05fd464c7260 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -323,13 +323,13 @@ func (a *Accumulator) NFields() int { // Wait waits for the given number of metrics to be added to the accumulator. func (a *Accumulator) Wait(n int) { a.Lock() + defer a.Unlock() if a.Cond == nil { a.Cond = sync.NewCond(&a.Mutex) } for int(a.NMetrics()) < n { a.Cond.Wait() } - a.Unlock() } // WaitError waits for the given number of errors to be added to the accumulator.