Skip to content

Commit

Permalink
cleanup cloudwatch plugin (influxdata#7928)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka authored and idohalevi committed Sep 23, 2020
1 parent b9872eb commit 7886aa9
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 72 deletions.
144 changes: 74 additions & 70 deletions plugins/inputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
internalaws "github.com/influxdata/telegraf/config/aws"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
Expand All @@ -20,65 +21,63 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
)

type (
// CloudWatch contains the configuration and cache for the cloudwatch plugin.
CloudWatch struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
CredentialPath string `toml:"shared_credential_file"`
Token string `toml:"token"`
EndpointURL string `toml:"endpoint_url"`
StatisticExclude []string `toml:"statistic_exclude"`
StatisticInclude []string `toml:"statistic_include"`
Timeout internal.Duration `toml:"timeout"`

Period internal.Duration `toml:"period"`
Delay internal.Duration `toml:"delay"`
Namespace string `toml:"namespace"`
Metrics []*Metric `toml:"metrics"`
CacheTTL internal.Duration `toml:"cache_ttl"`
RateLimit int `toml:"ratelimit"`

Log telegraf.Logger `toml:"-"`

client cloudwatchClient
statFilter filter.Filter
metricCache *metricCache
queryDimensions map[string]*map[string]string
windowStart time.Time
windowEnd time.Time
}
// CloudWatch contains the configuration and cache for the cloudwatch plugin.
type CloudWatch struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
CredentialPath string `toml:"shared_credential_file"`
Token string `toml:"token"`
EndpointURL string `toml:"endpoint_url"`
StatisticExclude []string `toml:"statistic_exclude"`
StatisticInclude []string `toml:"statistic_include"`
Timeout config.Duration `toml:"timeout"`

Period config.Duration `toml:"period"`
Delay config.Duration `toml:"delay"`
Namespace string `toml:"namespace"`
Metrics []*Metric `toml:"metrics"`
CacheTTL config.Duration `toml:"cache_ttl"`
RateLimit int `toml:"ratelimit"`

Log telegraf.Logger `toml:"-"`

client cloudwatchClient
statFilter filter.Filter
metricCache *metricCache
queryDimensions map[string]*map[string]string
windowStart time.Time
windowEnd time.Time
}

// Metric defines a simplified Cloudwatch metric.
Metric struct {
StatisticExclude *[]string `toml:"statistic_exclude"`
StatisticInclude *[]string `toml:"statistic_include"`
MetricNames []string `toml:"names"`
Dimensions []*Dimension `toml:"dimensions"`
}
// Metric defines a simplified Cloudwatch metric.
type Metric struct {
StatisticExclude *[]string `toml:"statistic_exclude"`
StatisticInclude *[]string `toml:"statistic_include"`
MetricNames []string `toml:"names"`
Dimensions []*Dimension `toml:"dimensions"`
}

// Dimension defines a simplified Cloudwatch dimension (provides metric filtering).
Dimension struct {
Name string `toml:"name"`
Value string `toml:"value"`
}
// Dimension defines a simplified Cloudwatch dimension (provides metric filtering).
type Dimension struct {
Name string `toml:"name"`
Value string `toml:"value"`
}

// metricCache caches metrics, their filters, and generated queries.
metricCache struct {
ttl time.Duration
built time.Time
metrics []filteredMetric
queries []*cloudwatch.MetricDataQuery
}
// metricCache caches metrics, their filters, and generated queries.
type metricCache struct {
ttl time.Duration
built time.Time
metrics []filteredMetric
queries []*cloudwatch.MetricDataQuery
}

cloudwatchClient interface {
ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error)
GetMetricData(*cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error)
}
)
type cloudwatchClient interface {
ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error)
GetMetricData(*cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error)
}

// SampleConfig returns the default configuration of the Cloudwatch input plugin.
func (c *CloudWatch) SampleConfig() string {
Expand Down Expand Up @@ -270,7 +269,7 @@ func (c *CloudWatch) initializeCloudWatch() {
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: c.Timeout.Duration,
Timeout: time.Duration(c.Timeout),
},
}

Expand Down Expand Up @@ -359,7 +358,7 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
c.metricCache = &metricCache{
metrics: fMetrics,
built: time.Now(),
ttl: c.CacheTTL.Duration,
ttl: time.Duration(c.CacheTTL),
}

return fMetrics, nil
Expand Down Expand Up @@ -395,11 +394,11 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) {
}

func (c *CloudWatch) updateWindow(relativeTo time.Time) {
windowEnd := relativeTo.Add(-c.Delay.Duration)
windowEnd := relativeTo.Add(-time.Duration(c.Delay))

if c.windowEnd.IsZero() {
// this is the first run, no window info, so just get a single period
c.windowStart = windowEnd.Add(-c.Period.Duration)
c.windowStart = windowEnd.Add(-time.Duration(c.Period))
} else {
// subsequent window, start where last window left off
c.windowStart = c.windowEnd
Expand Down Expand Up @@ -428,7 +427,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
Label: aws.String(snakeCase(*metric.MetricName + "_average")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticAverage),
},
})
Expand All @@ -440,7 +439,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
Label: aws.String(snakeCase(*metric.MetricName + "_maximum")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticMaximum),
},
})
Expand All @@ -452,7 +451,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
Label: aws.String(snakeCase(*metric.MetricName + "_minimum")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticMinimum),
},
})
Expand All @@ -464,7 +463,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
Label: aws.String(snakeCase(*metric.MetricName + "_sum")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticSum),
},
})
Expand All @@ -476,7 +475,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticSampleCount),
},
})
Expand All @@ -493,7 +492,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
c.metricCache = &metricCache{
queries: dataQueries,
built: time.Now(),
ttl: c.CacheTTL.Duration,
ttl: time.Duration(c.CacheTTL),
}
} else {
c.metricCache.queries = dataQueries
Expand Down Expand Up @@ -555,14 +554,19 @@ func (c *CloudWatch) aggregateMetrics(

func init() {
inputs.Add("cloudwatch", func() telegraf.Input {
return &CloudWatch{
CacheTTL: internal.Duration{Duration: time.Hour},
RateLimit: 25,
Timeout: internal.Duration{Duration: time.Second * 5},
}
return New()
})
}

// New instance of the cloudwatch plugin
func New() *CloudWatch {
return &CloudWatch{
CacheTTL: config.Duration(time.Hour),
RateLimit: 25,
Timeout: config.Duration(time.Second * 5),
}
}

func sanitizeMeasurement(namespace string) string {
namespace = strings.Replace(namespace, "/", "_", -1)
namespace = snakeCase(namespace)
Expand Down
4 changes: 2 additions & 2 deletions plugins/outputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ func (w *Wavefront) Connect() error {
}
w.sender = sender
} else {
w.Log.Debug("connecting over tcp using Host: %s and Port: %d", w.Host, w.Port)
w.Log.Debugf("connecting over tcp using Host: %q and Port: %d", w.Host, w.Port)
sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{
Host: w.Host,
MetricsPort: w.Port,
FlushIntervalSeconds: 5,
})
if err != nil {
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %s and Port: %d", w.Host, w.Port)
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %q and Port: %d", w.Host, w.Port)
}
w.sender = sender
}
Expand Down
1 change: 1 addition & 0 deletions plugins/processors/reverse_dns/reversedns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func TestSimpleReverseLookup(t *testing.T) {
}, now)

dns := newReverseDNS()
dns.Log = &testutil.Logger{}
dns.Lookups = []lookupEntry{
{
Field: "source_ip",
Expand Down

0 comments on commit 7886aa9

Please sign in to comment.