Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup cloudwatch plugin #7928

Merged
merged 1 commit into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 74 additions & 70 deletions plugins/inputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
internalaws "github.com/influxdata/telegraf/config/aws"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
Expand All @@ -20,65 +21,63 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
)

type (
// CloudWatch contains the configuration and cache for the cloudwatch plugin.
CloudWatch struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
CredentialPath string `toml:"shared_credential_file"`
Token string `toml:"token"`
EndpointURL string `toml:"endpoint_url"`
StatisticExclude []string `toml:"statistic_exclude"`
StatisticInclude []string `toml:"statistic_include"`
Timeout internal.Duration `toml:"timeout"`

Period internal.Duration `toml:"period"`
Delay internal.Duration `toml:"delay"`
Namespace string `toml:"namespace"`
Metrics []*Metric `toml:"metrics"`
CacheTTL internal.Duration `toml:"cache_ttl"`
RateLimit int `toml:"ratelimit"`

Log telegraf.Logger `toml:"-"`

client cloudwatchClient
statFilter filter.Filter
metricCache *metricCache
queryDimensions map[string]*map[string]string
windowStart time.Time
windowEnd time.Time
}
// CloudWatch contains the configuration and cache for the cloudwatch plugin.
type CloudWatch struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
CredentialPath string `toml:"shared_credential_file"`
Token string `toml:"token"`
EndpointURL string `toml:"endpoint_url"`
StatisticExclude []string `toml:"statistic_exclude"`
StatisticInclude []string `toml:"statistic_include"`
Timeout config.Duration `toml:"timeout"`

Period config.Duration `toml:"period"`
Delay config.Duration `toml:"delay"`
Namespace string `toml:"namespace"`
Metrics []*Metric `toml:"metrics"`
CacheTTL config.Duration `toml:"cache_ttl"`
RateLimit int `toml:"ratelimit"`

Log telegraf.Logger `toml:"-"`

client cloudwatchClient
statFilter filter.Filter
metricCache *metricCache
queryDimensions map[string]*map[string]string
windowStart time.Time
windowEnd time.Time
}

// Metric defines a simplified Cloudwatch metric.
Metric struct {
StatisticExclude *[]string `toml:"statistic_exclude"`
StatisticInclude *[]string `toml:"statistic_include"`
MetricNames []string `toml:"names"`
Dimensions []*Dimension `toml:"dimensions"`
}
// Metric defines a simplified Cloudwatch metric.
type Metric struct {
StatisticExclude *[]string `toml:"statistic_exclude"`
StatisticInclude *[]string `toml:"statistic_include"`
MetricNames []string `toml:"names"`
Dimensions []*Dimension `toml:"dimensions"`
}

// Dimension defines a simplified Cloudwatch dimension (provides metric filtering).
Dimension struct {
Name string `toml:"name"`
Value string `toml:"value"`
}
// Dimension defines a simplified Cloudwatch dimension (provides metric filtering).
type Dimension struct {
Name string `toml:"name"`
Value string `toml:"value"`
}

// metricCache caches metrics, their filters, and generated queries.
metricCache struct {
ttl time.Duration
built time.Time
metrics []filteredMetric
queries []*cloudwatch.MetricDataQuery
}
// metricCache caches metrics, their filters, and generated queries.
type metricCache struct {
ttl time.Duration
built time.Time
metrics []filteredMetric
queries []*cloudwatch.MetricDataQuery
}

cloudwatchClient interface {
ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error)
GetMetricData(*cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error)
}
)
type cloudwatchClient interface {
ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error)
GetMetricData(*cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error)
}

// SampleConfig returns the default configuration of the Cloudwatch input plugin.
func (c *CloudWatch) SampleConfig() string {
Expand Down Expand Up @@ -270,7 +269,7 @@ func (c *CloudWatch) initializeCloudWatch() {
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: c.Timeout.Duration,
Timeout: time.Duration(c.Timeout),
},
}

Expand Down Expand Up @@ -359,7 +358,7 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
c.metricCache = &metricCache{
metrics: fMetrics,
built: time.Now(),
ttl: c.CacheTTL.Duration,
ttl: time.Duration(c.CacheTTL),
}

return fMetrics, nil
Expand Down Expand Up @@ -395,11 +394,11 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) {
}

func (c *CloudWatch) updateWindow(relativeTo time.Time) {
windowEnd := relativeTo.Add(-c.Delay.Duration)
windowEnd := relativeTo.Add(-time.Duration(c.Delay))

if c.windowEnd.IsZero() {
// this is the first run, no window info, so just get a single period
c.windowStart = windowEnd.Add(-c.Period.Duration)
c.windowStart = windowEnd.Add(-time.Duration(c.Period))
} else {
// subsequent window, start where last window left off
c.windowStart = c.windowEnd
Expand Down Expand Up @@ -428,7 +427,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
Label: aws.String(snakeCase(*metric.MetricName + "_average")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticAverage),
},
})
Expand All @@ -440,7 +439,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
Label: aws.String(snakeCase(*metric.MetricName + "_maximum")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticMaximum),
},
})
Expand All @@ -452,7 +451,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
Label: aws.String(snakeCase(*metric.MetricName + "_minimum")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticMinimum),
},
})
Expand All @@ -464,7 +463,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
Label: aws.String(snakeCase(*metric.MetricName + "_sum")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticSum),
},
})
Expand All @@ -476,7 +475,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticSampleCount),
},
})
Expand All @@ -493,7 +492,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
c.metricCache = &metricCache{
queries: dataQueries,
built: time.Now(),
ttl: c.CacheTTL.Duration,
ttl: time.Duration(c.CacheTTL),
}
} else {
c.metricCache.queries = dataQueries
Expand Down Expand Up @@ -555,14 +554,19 @@ func (c *CloudWatch) aggregateMetrics(

func init() {
inputs.Add("cloudwatch", func() telegraf.Input {
return &CloudWatch{
CacheTTL: internal.Duration{Duration: time.Hour},
RateLimit: 25,
Timeout: internal.Duration{Duration: time.Second * 5},
}
return New()
})
}

// New instance of the cloudwatch plugin
func New() *CloudWatch {
return &CloudWatch{
CacheTTL: config.Duration(time.Hour),
RateLimit: 25,
Timeout: config.Duration(time.Second * 5),
}
}

func sanitizeMeasurement(namespace string) string {
namespace = strings.Replace(namespace, "/", "_", -1)
namespace = snakeCase(namespace)
Expand Down
4 changes: 2 additions & 2 deletions plugins/outputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ func (w *Wavefront) Connect() error {
}
w.sender = sender
} else {
w.Log.Debug("connecting over tcp using Host: %s and Port: %d", w.Host, w.Port)
w.Log.Debugf("connecting over tcp using Host: %q and Port: %d", w.Host, w.Port)
sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{
Host: w.Host,
MetricsPort: w.Port,
FlushIntervalSeconds: 5,
})
if err != nil {
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %s and Port: %d", w.Host, w.Port)
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %q and Port: %d", w.Host, w.Port)
}
w.sender = sender
}
Expand Down
1 change: 1 addition & 0 deletions plugins/processors/reverse_dns/reversedns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func TestSimpleReverseLookup(t *testing.T) {
}, now)

dns := newReverseDNS()
dns.Log = &testutil.Logger{}
dns.Lookups = []lookupEntry{
{
Field: "source_ip",
Expand Down