Skip to content

Commit

Permalink
Collection interval random jittering
Browse files Browse the repository at this point in the history
closes #460
  • Loading branch information
sparrc committed Jan 19, 2016
1 parent f3b5537 commit d3a5cca
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- [#522](https://github.com/influxdata/telegraf/pull/522): Phusion passenger input plugin. Thanks @kureikain!
- [#541](https://github.com/influxdata/telegraf/pull/541): Kafka output TLS cert support. Thanks @Ormod!
- [#551](https://github.com/influxdb/telegraf/pull/551): Statsd UDP read packet size now defaults to 1500 bytes, and is configurable.
- [#552](https://github.com/influxdata/telegraf/pull/552): Support for collection interval jittering.

### Bugfixes
- [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert!
Expand Down
19 changes: 15 additions & 4 deletions agent.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package telegraf

import (
"crypto/rand"
cryptorand "crypto/rand"
"fmt"
"log"
"math/big"
"math/rand"
"os"
"sync"
"time"
Expand Down Expand Up @@ -92,6 +93,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {

start := time.Now()
counter := 0
jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds()
for _, input := range a.Config.Inputs {
if input.Config.Interval != 0 {
continue
Expand All @@ -104,9 +106,19 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {

acc := NewAccumulator(input.Config, pointChan)
acc.SetDebug(a.Config.Agent.Debug)
// acc.SetPrefix(input.Name + "_")
acc.SetDefaultTags(a.Config.Tags)

if jitter != 0 {
nanoSleep := rand.Int63n(jitter)
d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep))
if err != nil {
log.Printf("Jittering collection interval failed for plugin %s",
input.Name)
} else {
time.Sleep(d)
}
}

if err := input.Input.Gather(acc); err != nil {
log.Printf("Error in input [%s]: %s", input.Name, err)
}
Expand Down Expand Up @@ -143,7 +155,6 @@ func (a *Agent) gatherSeparate(

acc := NewAccumulator(input.Config, pointChan)
acc.SetDebug(a.Config.Agent.Debug)
// acc.SetPrefix(input.Name + "_")
acc.SetDefaultTags(a.Config.Tags)

if err := input.Input.Gather(acc); err != nil {
Expand Down Expand Up @@ -315,7 +326,7 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration {
outinterval := ininterval
if injitter.Nanoseconds() != 0 {
maxjitter := big.NewInt(injitter.Nanoseconds())
if j, err := rand.Int(rand.Reader, maxjitter); err == nil {
if j, err := cryptorand.Int(cryptorand.Reader, maxjitter); err == nil {
jitter = j.Int64()
}
outinterval = time.Duration(jitter + ininterval.Nanoseconds())
Expand Down
16 changes: 15 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,22 @@ type AgentConfig struct {
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
RoundInterval bool

// CollectionJitter is used to jitter the collection by a random amount.
// Each plugin will sleep for a random time within jitter before collecting.
// This can be used to avoid many plugins querying things like sysfs at the
// same time, which can have a measurable effect on the system.
CollectionJitter internal.Duration

// Interval at which to flush data
FlushInterval internal.Duration

// FlushRetries is the number of times to retry each data flush
FlushRetries int

// FlushJitter tells
// FlushJitter Jitters the flush interval by a random amount.
// This is primarily to avoid large write spikes for users running a large
// number of telegraf instances.
// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
FlushJitter internal.Duration

// TODO(cam): Remove UTC and Precision parameters, they are no longer
Expand Down Expand Up @@ -271,6 +280,11 @@ var header = `# Telegraf configuration
# Rounds collection interval to 'interval'
# ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
# Collection jitter is used to jitter the collection by a random amount.
# Each plugin will sleep for a random time within jitter before collecting.
# This can be used to avoid many plugins querying things like sysfs at the
# same time, which can have a measurable effect on the system.
collection_jitter = "0s"
# Default data flushing interval for all outputs. You should not set this below
# interval. Maximum flush_interval will be flush_interval + flush_jitter
Expand Down

0 comments on commit d3a5cca

Please sign in to comment.