Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execd processor #7640

Merged
merged 7 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ func (a *Agent) runProcessors(
for m := range unit.src {
if err := unit.processor.Add(m, acc); err != nil {
acc.AddError(err)
m.Drop()
}
}
unit.processor.Stop()
Expand Down
75 changes: 75 additions & 0 deletions docs/PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,81 @@ func init() {
}
```

### Streaming Processors

Streaming processors are a new processor type available to you. They are
particularly useful to implement processor types that use background processes
or goroutines to process multiple metrics at the same time. Some examples of this
are the execd processor, which pipes metrics out to an external process over stdin
and reads them back over stdout, and the reverse_dns processor, which does reverse
dns lookups on IP addresses in fields. While both of these come with a speed cost,
it would be significantly worse if you had to process one metric completely from
start to finish before handling the next metric, and thus they benefit
significantly from a streaming-pipe approach.

Some differences from classic Processors:

* Streaming processors must conform to the [telegraf.StreamingProcessor][] interface.
* Processors should call `processors.AddStreaming` in their `init` function to register
themselves. See below for a quick example.

### Streaming Processor Example

```go
package printer

// printer.go

import (
"fmt"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)

type Printer struct {
}

var sampleConfig = `
`

func (p *Printer) SampleConfig() string {
return sampleConfig
}

func (p *Printer) Description() string {
return "Print all metrics that pass through this filter."
}

func (p *Printer) Init() error {
return nil
}

func (p *Printer) Start(acc telegraf.Accumulator) error {
}

func (p *Printer) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
// print!
fmt.Println(metric.String())

// pass the metric downstream, or metric.Drop() it.
// Metric will be dropped if this function returns an error.
acc.AddMetric(metric)

return nil
}

func (p *Printer) Stop() error {
}

func init() {
processors.AddStreaming("printer", func() telegraf.StreamingProcessor {
return &Printer{}
})
}
```

[SampleConfig]: https://github.com/influxdata/telegraf/wiki/SampleConfig
[CodeStyle]: https://github.com/influxdata/telegraf/wiki/CodeStyle
[telegraf.Processor]: https://godoc.org/github.com/influxdata/telegraf#Processor
[telegraf.StreamingProcessor]: https://godoc.org/github.com/influxdata/telegraf#StreamingProcessor
165 changes: 165 additions & 0 deletions internal/process/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package process

import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"os/exec"
"sync"
"time"

"github.com/influxdata/telegraf"
)

// Process is a long-running process manager that will restart processes if they stop.
type Process struct {
Cmd *exec.Cmd
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
ReadStdoutFn func(io.Reader)
ReadStderrFn func(io.Reader)
RestartDelay time.Duration
Log telegraf.Logger

cancel context.CancelFunc
mainLoopWg sync.WaitGroup
}

// New creates a new process wrapper
func New(command []string) (*Process, error) {
if len(command) == 0 {
return nil, errors.New("no command")
}

p := &Process{
RestartDelay: 5 * time.Second,
}
if len(command) > 1 {
p.Cmd = exec.Command(command[0], command[1:]...)
} else {
p.Cmd = exec.Command(command[0])
}
var err error
p.Stdin, err = p.Cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("error opening stdin pipe: %w", err)
}

p.Stdout, err = p.Cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("error opening stdout pipe: %w", err)
}

p.Stderr, err = p.Cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("error opening stderr pipe: %w", err)
}

return p, nil
}

// Start the process
func (p *Process) Start() error {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

if err := p.cmdStart(); err != nil {
return err
}

p.mainLoopWg.Add(1)
go func() {
if err := p.cmdLoop(ctx); err != nil {
p.Log.Errorf("Process quit with message: %v", err)
}
p.mainLoopWg.Done()
}()

return nil
}

func (p *Process) Stop() {
if p.cancel != nil {
p.cancel()
}
p.mainLoopWg.Wait()
}

func (p *Process) cmdStart() error {
p.Log.Infof("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args)

if err := p.Cmd.Start(); err != nil {
return fmt.Errorf("error starting process: %s", err)
}

return nil
}

// cmdLoop watches an already running process, restarting it when appropriate.
func (p *Process) cmdLoop(ctx context.Context) error {
ssoroka marked this conversation as resolved.
Show resolved Hide resolved
go func() {
<-ctx.Done()
if p.Stdin != nil {
p.Stdin.Close()
gracefulStop(p.Cmd, 5*time.Second)
}
}()

for {
err := p.cmdWait()
if isQuitting(ctx) {
p.Log.Infof("Process %s shut down", p.Cmd.Path)
return nil
}

p.Log.Errorf("Process %s exited: %v", p.Cmd.Path, err)
p.Log.Infof("Restarting in %s...", time.Duration(p.RestartDelay))

select {
case <-ctx.Done():
return nil
case <-time.After(time.Duration(p.RestartDelay)):
// Continue the loop and restart the process
if err := p.cmdStart(); err != nil {
return err
}
}
}
}

func (p *Process) cmdWait() error {
var wg sync.WaitGroup

if p.ReadStdoutFn == nil {
p.ReadStdoutFn = defaultReadPipe
}
if p.ReadStderrFn == nil {
p.ReadStderrFn = defaultReadPipe
}

wg.Add(1)
go func() {
p.ReadStdoutFn(p.Stdout)
wg.Done()
}()

wg.Add(1)
go func() {
p.ReadStderrFn(p.Stderr)
wg.Done()
}()

wg.Wait()
return p.Cmd.Wait()
}

func isQuitting(ctx context.Context) bool {
ssoroka marked this conversation as resolved.
Show resolved Hide resolved
return ctx.Err() != nil
}

func defaultReadPipe(r io.Reader) {
io.Copy(ioutil.Discard, r)
}
28 changes: 28 additions & 0 deletions internal/process/process_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// +build !windows

package process

import (
"os/exec"
"syscall"
"time"
)

func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
})
}
})
}
19 changes: 19 additions & 0 deletions internal/process/process_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// +build windows

package process

import (
"os/exec"
"time"
)

func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
})
}
Loading