Skip to content

Commit

Permalink
Set a timeout for calls to input.Gather
Browse files Browse the repository at this point in the history
Changing the internal behavior around running plugins. Each plugin
will now have it's own goroutine with it's own ticker. This means that a
hung plugin will not block any other plugins. When a plugin is hung, we
will log an error message every interval, letting users know which
plugin is hung.

Currently the input interface does not have any methods for killing a
running Gather call, so there is nothing we can do but log an "ERROR"
and move on. This will give some visibility into the plugin that is
acting up.

closes #1230
fixes #479
  • Loading branch information
sparrc committed May 21, 2016
1 parent 56aee1c commit ae54ec2
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 92 deletions.
15 changes: 11 additions & 4 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"log"
"math"
"sync"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -22,13 +21,13 @@ func NewAccumulator(
}

type accumulator struct {
sync.Mutex

metrics chan telegraf.Metric

defaultTags map[string]string

debug bool
// print every point added to the accumulator
trace bool

inputConfig *internal_models.InputConfig

Expand Down Expand Up @@ -152,7 +151,7 @@ func (ac *accumulator) AddFields(
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return
}
if ac.debug {
if ac.trace {
fmt.Println("> " + m.String())
}
ac.metrics <- m
Expand All @@ -166,6 +165,14 @@ func (ac *accumulator) SetDebug(debug bool) {
ac.debug = debug
}

func (ac *accumulator) Trace() bool {
return ac.trace
}

func (ac *accumulator) SetTrace(trace bool) {
ac.trace = trace
}

func (ac *accumulator) setDefaultTags(tags map[string]string) {
ac.defaultTags = tags
}
Expand Down
158 changes: 70 additions & 88 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,71 +102,20 @@ func panicRecover(input *internal_models.RunningInput) {
}
}

// gatherParallel runs the inputs that are using the same reporting interval
// as the telegraf agent.
func (a *Agent) gatherParallel(metricC chan telegraf.Metric) error {
var wg sync.WaitGroup

start := time.Now()
counter := 0
jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds()
for _, input := range a.Config.Inputs {
if input.Config.Interval != 0 {
continue
}

wg.Add(1)
counter++
go func(input *internal_models.RunningInput) {
defer panicRecover(input)
defer wg.Done()

acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)

if jitter != 0 {
nanoSleep := rand.Int63n(jitter)
d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep))
if err != nil {
log.Printf("Jittering collection interval failed for plugin %s",
input.Name)
} else {
time.Sleep(d)
}
}

if err := input.Input.Gather(acc); err != nil {
log.Printf("Error in input [%s]: %s", input.Name, err)
}

}(input)
}

if counter == 0 {
return nil
}

wg.Wait()

elapsed := time.Since(start)
if !a.Config.Agent.Quiet {
log.Printf("Gathered metrics, (%s interval), from %d inputs in %s\n",
a.Config.Agent.Interval.Duration, counter, elapsed)
}
return nil
}

// gatherSeparate runs the inputs that have been configured with their own
// gatherer runs the inputs that have been configured with their own
// reporting interval.
func (a *Agent) gatherSeparate(
func (a *Agent) gatherer(
shutdown chan struct{},
input *internal_models.RunningInput,
interval time.Duration,
metricC chan telegraf.Metric,
) error {
defer panicRecover(input)

ticker := time.NewTicker(input.Config.Interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()

jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds()

for {
var outerr error
Expand All @@ -176,14 +125,23 @@ func (a *Agent) gatherSeparate(
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)

if err := input.Input.Gather(acc); err != nil {
log.Printf("Error in input [%s]: %s", input.Name, err)
if jitter != 0 {
nanoSleep := rand.Int63n(jitter)
d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep))
if err != nil {
log.Printf("Jittering collection interval failed for plugin %s",
input.Name)
} else {
time.Sleep(d)
}
}

gatherWithTimeout(shutdown, input, acc, interval)

elapsed := time.Since(start)
if !a.Config.Agent.Quiet {
log.Printf("Gathered metrics, (separate %s interval), from %s in %s\n",
input.Config.Interval, input.Name, elapsed)
if a.Config.Agent.Debug {
log.Printf("Input [%s] Gathered metrics, (%s interval) in %s\n",
input.Name, interval, elapsed)
}

if outerr != nil {
Expand All @@ -199,6 +157,42 @@ func (a *Agent) gatherSeparate(
}
}

// gatherWithTimeout gathers from the given input, with the given timeout.
// when the given timeout is reached, gatherWithTimeout logs an error message
// but continues waiting for it to return. This is to avoid leaving behind
// hung processes, and to prevent re-calling the same hung process over and
// over.
func gatherWithTimeout(
shutdown chan struct{},
input *internal_models.RunningInput,
acc *accumulator,
timeout time.Duration,
) {
ticker := time.NewTicker(timeout)
defer ticker.Stop()
done := make(chan error)
go func() {
done <- input.Input.Gather(acc)
}()

for {
select {
case err := <-done:
if err != nil {
log.Printf("ERROR in input [%s]: %s", input.Name, err)
}
return
case <-ticker.C:
log.Printf("ERROR: input [%s] took longer to collect than "+
"collection interval (%s)",
input.Name, timeout)
continue
case <-shutdown:
return
}
}
}

// Test verifies that we can 'Gather' from all inputs with their configured
// Config struct
func (a *Agent) Test() error {
Expand All @@ -220,7 +214,7 @@ func (a *Agent) Test() error {

for _, input := range a.Config.Inputs {
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(true)
acc.SetTrace(true)
acc.setDefaultTags(a.Config.Tags)

fmt.Printf("* Plugin: %s, Collection 1\n", input.Name)
Expand Down Expand Up @@ -348,7 +342,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
i := int64(a.Config.Agent.Interval.Duration)
time.Sleep(time.Duration(i - (time.Now().UnixNano() % i)))
}
ticker := time.NewTicker(a.Config.Agent.Interval.Duration)

wg.Add(1)
go func() {
Expand All @@ -359,32 +352,21 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}
}()

wg.Add(len(a.Config.Inputs))
for _, input := range a.Config.Inputs {
// Special handling for inputs that have their own collection interval
// configured. Default intervals are handled below with gatherParallel
interval := a.Config.Agent.Interval.Duration
// overwrite global interval if this plugin has it's own.
if input.Config.Interval != 0 {
wg.Add(1)
go func(input *internal_models.RunningInput) {
defer wg.Done()
if err := a.gatherSeparate(shutdown, input, metricC); err != nil {
log.Printf(err.Error())
}
}(input)
interval = input.Config.Interval
}
go func(in *internal_models.RunningInput, interv time.Duration) {
defer wg.Done()
if err := a.gatherer(shutdown, in, interv, metricC); err != nil {
log.Printf(err.Error())
}
}(input, interval)
}

defer wg.Wait()

for {
if err := a.gatherParallel(metricC); err != nil {
log.Printf(err.Error())
}

select {
case <-shutdown:
return nil
case <-ticker.C:
continue
}
}
wg.Wait()
return nil
}

0 comments on commit ae54ec2

Please sign in to comment.