Skip to content

Commit

Permalink
Normalize collection interval to nearest interval
Browse files Browse the repository at this point in the history
closes #301
  • Loading branch information
sparrc committed Oct 21, 2015
1 parent 4395a46 commit a1067fa
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 84 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ collect data every 2 seconds, and flush every 60 seconds.
- `precision` and `utc` are no longer valid agent config values. `precision` has
moved to the `influxdb` output config, where it will continue to default to "s"
- debug and test output will now print the raw line-protocol string
- Telegraf will now, by default, round the collection interval to the nearest
even interval. This means that `interval="10s"` will collect every :00, :10, etc.
To ease scale concerns, flushing will be "jittered" by a random amount so that
all Telegraf instances do not flush at the same time. Both of these options can
be controlled via the `round_interval` and `flush_jitter` config options.

### Features
- [#205](https://github.com/influxdb/telegraf/issues/205): Include per-db redis keyspace info
Expand Down
74 changes: 54 additions & 20 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package telegraf
import (
"fmt"
"log"
"math/rand"
"os"
"sort"
"sync"
Expand Down Expand Up @@ -32,12 +33,19 @@ type Agent struct {
// Interval at which to gather information
Interval duration.Duration

// RoundInterval rounds collection interval to 'interval'.
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
RoundInterval bool

// Interval at which to flush data
FlushInterval duration.Duration

// FlushRetries is the number of times to retry each data flush
FlushRetries int

// FlushJitter tells
FlushJitter duration.Duration

// TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
Expand All @@ -64,10 +72,10 @@ func NewAgent(config *Config) (*Agent, error) {
agent := &Agent{
Tags: make(map[string]string),
Interval: duration.Duration{10 * time.Second},
RoundInterval: true,
FlushInterval: duration.Duration{10 * time.Second},
FlushRetries: 2,
UTC: true,
Precision: "s",
FlushJitter: duration.Duration{5 * time.Second},
}

// Apply the toml table to the agent config, overriding defaults
Expand Down Expand Up @@ -294,30 +302,37 @@ func (a *Agent) Test() error {
return nil
}

// writeOutput writes a list of points to a single output, with retries
// writeOutput writes a list of points to a single output, with retries.
// Optionally takes a `done` channel to indicate that it is done writing.
func (a *Agent) writeOutput(
points []*client.Point,
ro *runningOutput,
shutdown chan struct{},
wg *sync.WaitGroup,
) {
defer wg.Done()
if len(points) == 0 {
return
}
retry := 0
retries := a.FlushRetries
start := time.Now()

for {
err := ro.output.Write(points)
if err == nil {
// Write successful
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to output %s in %s\n",
len(points), ro.name, elapsed)
return
}

select {
case <-shutdown:
return
default:
if err == nil {
// Write successful
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to output %s in %s\n",
len(points), ro.name, elapsed)
return
} else if retry >= retries {
if retry >= retries {
// No more retries
msg := "FATAL: Write to output [%s] failed %d times, dropping" +
" %d metrics\n"
Expand All @@ -336,13 +351,18 @@ func (a *Agent) writeOutput(
}

// flush writes a list of points to all configured outputs
func (a *Agent) flush(points []*client.Point, shutdown chan struct{}) {
if len(points) == 0 {
return
}

func (a *Agent) flush(
points []*client.Point,
shutdown chan struct{},
wait bool,
) {
var wg sync.WaitGroup
for _, o := range a.outputs {
go a.writeOutput(points, o, shutdown)
wg.Add(1)
go a.writeOutput(points, o, shutdown, &wg)
}
if wait {
wg.Wait()
}
}

Expand All @@ -353,14 +373,23 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er
time.Sleep(time.Millisecond * 100)
ticker := time.NewTicker(a.FlushInterval.Duration)
points := make([]*client.Point, 0)
jitter := rand.Int63n(int64(a.FlushJitter.Duration))
for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached points before shutdown")
a.flush(points, shutdown)
a.flush(points, shutdown, true)
return nil
case <-ticker.C:
a.flush(points, shutdown)
timer := time.NewTimer(time.Duration(jitter))
select {
case <-timer.C:
a.flush(points, shutdown, false)
case <-shutdown:
log.Println("Hang on, flushing any cached points before shutdown")
a.flush(points, shutdown, true)
return nil
}
points = make([]*client.Point, 0)
case pt := <-pointChan:
points = append(points, pt)
Expand All @@ -375,6 +404,13 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// channel shared between all plugin threads for accumulating points
pointChan := make(chan *client.Point, 1000)

// Round collection to nearest interval by sleeping
if a.RoundInterval {
i := int64(a.Interval.Duration)
time.Sleep(time.Duration(i - (time.Now().UnixNano() % i)))
}
ticker := time.NewTicker(a.Interval.Duration)

wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -412,8 +448,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {

defer wg.Wait()

ticker := time.NewTicker(a.Interval.Duration)

for {
if err := a.gatherParallel(pointChan); err != nil {
log.Printf(err.Error())
Expand Down
61 changes: 0 additions & 61 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func TestAgent_LoadPlugin(t *testing.T) {
assert.Equal(t, 2, len(pluginsEnabled))
}

// TODO enable these unit tests, currently disabled because of a circular import
func TestAgent_LoadOutput(t *testing.T) {
// load a dedicated configuration file
config, _ := LoadConfig("./testdata/telegraf-agent.toml")
Expand All @@ -56,63 +55,3 @@ func TestAgent_LoadOutput(t *testing.T) {
outputsEnabled, _ = a.LoadOutputs([]string{"influxdb", "foo", "kafka", "bar"}, config)
assert.Equal(t, 2, len(outputsEnabled))
}

/*
func TestAgent_DrivesMetrics(t *testing.T) {
var (
plugin plugins.MockPlugin
)
defer plugin.AssertExpectations(t)
defer metrics.AssertExpectations(t)
a := &Agent{
plugins: []plugins.Plugin{&plugin},
Config: &Config{},
}
plugin.On("Add", "foo", 1.2, nil).Return(nil)
plugin.On("Add", "bar", 888, nil).Return(nil)
err := a.gather()
require.NoError(t, err)
}
func TestAgent_AppliesTags(t *testing.T) {
var (
plugin plugins.MockPlugin
metrics MockMetrics
)
defer plugin.AssertExpectations(t)
defer metrics.AssertExpectations(t)
a := &Agent{
plugins: []plugins.Plugin{&plugin},
metrics: &metrics,
Config: &Config{
Tags: map[string]string{
"dc": "us-west-1",
},
},
}
m1 := cypress.Metric()
m1.Add("name", "foo")
m1.Add("value", 1.2)
msgs := []*cypress.Message{m1}
m2 := cypress.Metric()
m2.Timestamp = m1.Timestamp
m2.Add("name", "foo")
m2.Add("value", 1.2)
m2.AddTag("dc", "us-west-1")
plugin.On("Read").Return(msgs, nil)
metrics.On("Receive", m2).Return(nil)
err := a.gather()
require.NoError(t, err)
}
*/
10 changes: 9 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,19 @@ var header = `# Telegraf configuration
[agent]
# Default data collection interval for all plugins
interval = "10s"
# Rounds collection interval to 'interval'
# ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
# Default data flushing interval for all outputs
flush_interval = "10s"
# Jitter the flush interval by a random range
# ie, a jitter of 5s and interval 10s means flush will happen every 10-15s
flush_jitter = "5s"
# Number of times to retry each data flush
flush_retries = 2
# run telegraf in debug mode
# Run telegraf in debug mode
debug = false
# Override default hostname, if empty use os.Hostname()
hostname = ""
Expand Down
10 changes: 9 additions & 1 deletion etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,19 @@
[agent]
# Default data collection interval for all plugins
interval = "10s"
# Rounds collection interval to 'interval'
# ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true

# Default data flushing interval for all outputs
flush_interval = "10s"
# Jitter the flush interval by a random range
# ie, a jitter of 5s and interval 10s means flush will happen every 10-15s
flush_jitter = "5s"
# Number of times to retry each data flush
flush_retries = 2
# run telegraf in debug mode

# Run telegraf in debug mode
debug = false
# Override default hostname, if empty use os.Hostname()
hostname = ""
Expand Down
3 changes: 2 additions & 1 deletion outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ var sampleConfig = `
urls = ["http://localhost:8086"] # required
# The target database for metrics (telegraf will create it if not exists)
database = "telegraf" # required
# Precision of writes, valid values are n, u, ms, s, m, and h
# note: using second precision greatly helps InfluxDB compression
precision = "s"
# Connection timeout (for the connection with InfluxDB), formatted as a string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# If not provided, will default to 0 (no timeout)
# timeout = "5s"
# username = "telegraf"
Expand Down

0 comments on commit a1067fa

Please sign in to comment.