Skip to content

Commit

Permalink
Add Accumulator to the ServiceInput Start() function
Browse files Browse the repository at this point in the history
closes #666
  • Loading branch information
sparrc committed Feb 17, 2016
1 parent 7f539c9 commit 9d3eaa5
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 254 deletions.
20 changes: 12 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func (a *Agent) Connect() error {
}
err := o.Output.Connect()
if err != nil {
log.Printf("Failed to connect to output %s, retrying in 15s, error was '%s' \n", o.Name, err)
log.Printf("Failed to connect to output %s, retrying in 15s, "+
"error was '%s' \n", o.Name, err)
time.Sleep(15 * time.Second)
err = o.Output.Connect()
if err != nil {
Expand Down Expand Up @@ -241,7 +242,7 @@ func (a *Agent) Test() error {
return nil
}

// flush writes a list of points to all configured outputs
// flush writes a list of metrics to all configured outputs
func (a *Agent) flush() {
var wg sync.WaitGroup

Expand All @@ -260,7 +261,7 @@ func (a *Agent) flush() {
wg.Wait()
}

// flusher monitors the points input channel and flushes on the minimum interval
// flusher monitors the metrics input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
Expand All @@ -271,14 +272,14 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached points before shutdown")
log.Println("Hang on, flushing any cached metrics before shutdown")
a.flush()
return nil
case <-ticker.C:
a.flush()
case m := <-metricC:
for _, o := range a.Config.Outputs {
o.AddPoint(m)
o.AddMetric(m)
}
}
}
Expand Down Expand Up @@ -318,8 +319,8 @@ func (a *Agent) Run(shutdown chan struct{}) error {
a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet,
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)

// channel shared between all input threads for accumulating points
metricC := make(chan telegraf.Metric, 1000)
// channel shared between all input threads for accumulating metrics
metricC := make(chan telegraf.Metric, 10000)

// Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval {
Expand All @@ -342,7 +343,10 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// Start service of any ServicePlugins
switch p := input.Input.(type) {
case telegraf.ServiceInput:
if err := p.Start(); err != nil {
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
Expand Down
2 changes: 1 addition & 1 deletion input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ServiceInput interface {
Gather(Accumulator) error

// Start starts the ServiceInput's service, whatever that may be
Start() error
Start(Accumulator) error

// Stop stops the services and closes any necessary channels and connections
Stop()
Expand Down
12 changes: 10 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type AgentConfig struct {
// same time, which can have a measurable effect on the system.
CollectionJitter internal.Duration

// Interval at which to flush data
// FlushInterval is the Interval at which to flush data
FlushInterval internal.Duration

// FlushJitter Jitters the flush interval by a random amount.
Expand All @@ -82,6 +82,11 @@ type AgentConfig struct {
// full, the oldest metrics will be overwritten.
MetricBufferLimit int

// FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever
// it fills up, regardless of FlushInterval. Setting this option to true
// does _not_ deactivate FlushInterval.
FlushBufferWhenFull bool

// TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
Expand Down Expand Up @@ -157,6 +162,8 @@ var header = `##################################################################
### Telegraf will cache metric_buffer_limit metrics for each output, and will
### flush this buffer on a successful write.
metric_buffer_limit = 10000
### Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true
### Collection jitter is used to jitter the collection by a random amount.
### Each plugin will sleep for a random time within jitter before collecting.
Expand Down Expand Up @@ -421,8 +428,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error {

ro := internal_models.NewRunningOutput(name, output, outputConfig)
if c.Agent.MetricBufferLimit > 0 {
ro.PointBufferLimit = c.Agent.MetricBufferLimit
ro.MetricBufferLimit = c.Agent.MetricBufferLimit
}
ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull
ro.Quiet = c.Agent.Quiet
c.Outputs = append(c.Outputs, ro)
return nil
Expand Down
119 changes: 91 additions & 28 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,34 @@ package internal_models

import (
"log"
"sync"
"time"

"github.com/influxdata/telegraf"
)

const DEFAULT_POINT_BUFFER_LIMIT = 10000
const (
// Default number of metrics kept between flushes.
DEFAULT_METRIC_BUFFER_LIMIT = 10000

// Limit how many full metric buffers are kept due to failed writes.
FULL_METRIC_BUFFERS_LIMIT = 100
)

type RunningOutput struct {
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
PointBufferLimit int
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
MetricBufferLimit int
FlushBufferWhenFull bool

metrics []telegraf.Metric
overwriteCounter int
metrics []telegraf.Metric
tmpmetrics map[int][]telegraf.Metric
overwriteI int
mapI int

sync.Mutex
}

func NewRunningOutput(
Expand All @@ -26,47 +38,98 @@ func NewRunningOutput(
conf *OutputConfig,
) *RunningOutput {
ro := &RunningOutput{
Name: name,
metrics: make([]telegraf.Metric, 0),
Output: output,
Config: conf,
PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT,
Name: name,
metrics: make([]telegraf.Metric, 0),
tmpmetrics: make(map[int][]telegraf.Metric),
Output: output,
Config: conf,
MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT,
}
return ro
}

func (ro *RunningOutput) AddPoint(point telegraf.Metric) {
// AddMetric adds a metric to the output. This function can also write cached
// points if FlushBufferWhenFull is true.
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
if ro.Config.Filter.IsActive {
if !ro.Config.Filter.ShouldMetricPass(point) {
if !ro.Config.Filter.ShouldMetricPass(metric) {
return
}
}

if len(ro.metrics) < ro.PointBufferLimit {
ro.metrics = append(ro.metrics, point)
if len(ro.metrics) < ro.MetricBufferLimit {
ro.Lock()
ro.metrics = append(ro.metrics, metric)
ro.Unlock()
} else {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] config " +
"if you do not wish to overwrite metrics.\n")
if ro.overwriteCounter == len(ro.metrics) {
ro.overwriteCounter = 0
if ro.FlushBufferWhenFull {
ro.Lock()
tmpmetrics := make([]telegraf.Metric, len(ro.metrics))
copy(tmpmetrics, ro.metrics)
ro.metrics = make([]telegraf.Metric, 0)
ro.Unlock()
err := ro.write(tmpmetrics)
if err != nil {
log.Printf("ERROR writing full metric buffer to output %s, %s",
ro.Name, err)
if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT {
ro.mapI = 0
// overwrite one
ro.tmpmetrics[ro.mapI] = tmpmetrics
ro.mapI++
} else {
ro.tmpmetrics[ro.mapI] = tmpmetrics
ro.mapI++
}
}
} else {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] " +
"config if you do not wish to overwrite metrics.\n")
ro.Lock()
if ro.overwriteI == len(ro.metrics) {
ro.overwriteI = 0
}
ro.metrics[ro.overwriteI] = metric
ro.overwriteI++
ro.Unlock()
}
ro.metrics[ro.overwriteCounter] = point
ro.overwriteCounter++
}
}

// Write writes all cached points to this output.
func (ro *RunningOutput) Write() error {
ro.Lock()
err := ro.write(ro.metrics)
if err != nil {
return err
} else {
ro.metrics = make([]telegraf.Metric, 0)
ro.overwriteI = 0
}
ro.Unlock()

// Write any cached metric buffers that failed previously
for i, tmpmetrics := range ro.tmpmetrics {
if err := ro.write(tmpmetrics); err != nil {
return err
} else {
delete(ro.tmpmetrics, i)
}
}

return nil
}

func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
start := time.Now()
err := ro.Output.Write(ro.metrics)
err := ro.Output.Write(metrics)
elapsed := time.Since(start)
if err == nil {
if !ro.Quiet {
log.Printf("Wrote %d metrics to output %s in %s\n",
len(ro.metrics), ro.Name, elapsed)
len(metrics), ro.Name, elapsed)
}
ro.metrics = make([]telegraf.Metric, 0)
ro.overwriteCounter = 0
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/github_webhooks/github_webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (gh *GithubWebhooks) Listen() {
}
}

func (gh *GithubWebhooks) Start() error {
func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error {
go gh.Listen()
log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress)
return nil
Expand Down
43 changes: 12 additions & 31 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kafka_consumer

import (
"fmt"
"log"
"strings"
"sync"
Expand All @@ -19,11 +18,13 @@ type Kafka struct {
Topics []string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
MetricBuffer int

// Legacy metric buffer support
MetricBuffer int
// TODO remove PointBuffer, legacy support
PointBuffer int
Offset string

Offset string
parser parsers.Parser

sync.Mutex
Expand All @@ -32,9 +33,10 @@ type Kafka struct {
in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors
errs <-chan *sarama.ConsumerError
// channel for all incoming parsed kafka metrics
metricC chan telegraf.Metric
done chan struct{}
done chan struct{}

// keep the accumulator internally:
acc telegraf.Accumulator

// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
// this is mostly for test purposes, but there may be a use-case for it later.
Expand All @@ -48,8 +50,6 @@ var sampleConfig = `
zookeeper_peers = ["localhost:2181"]
### the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Offset (must be either "oldest" or "newest")
offset = "oldest"
Expand All @@ -72,11 +72,13 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
k.parser = parser
}

func (k *Kafka) Start() error {
func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
var consumerErr error

k.acc = acc

config := consumergroup.NewConfig()
switch strings.ToLower(k.Offset) {
case "oldest", "":
Expand Down Expand Up @@ -106,13 +108,6 @@ func (k *Kafka) Start() error {
}

k.done = make(chan struct{})
if k.PointBuffer == 0 && k.MetricBuffer == 0 {
k.MetricBuffer = 100000
} else if k.PointBuffer > 0 {
// Legacy support of PointBuffer field TODO remove
k.MetricBuffer = k.PointBuffer
}
k.metricC = make(chan telegraf.Metric, k.MetricBuffer)

// Start the kafka message reader
go k.receiver()
Expand All @@ -138,14 +133,7 @@ func (k *Kafka) receiver() {
}

for _, metric := range metrics {
fmt.Println(string(metric.Name()))
select {
case k.metricC <- metric:
continue
default:
log.Printf("Kafka Consumer buffer is full, dropping a metric." +
" You may want to increase the metric_buffer setting")
}
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}

if !k.doNotCommitMsgs {
Expand All @@ -169,13 +157,6 @@ func (k *Kafka) Stop() {
}

func (k *Kafka) Gather(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
nmetrics := len(k.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-k.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}

Expand Down
Loading

0 comments on commit 9d3eaa5

Please sign in to comment.