-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wavefront output enhancements #5161
Changes from all commits
4c04528
be2128c
2250da7
6a66c82
db8bd68
84ae10c
3829683
3fc7161
3ab8b9d
6819e36
15d943f
eef7ef8
28676a1
b67c04a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,31 @@ | ||
package wavefront | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"log" | ||
"net" | ||
"regexp" | ||
"strconv" | ||
"strings" | ||
|
||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/plugins/outputs" | ||
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders" | ||
) | ||
|
||
type Wavefront struct { | ||
Prefix string | ||
Url string | ||
Token string | ||
Host string | ||
Port int | ||
Prefix string | ||
SimpleFields bool | ||
MetricSeparator string | ||
ConvertPaths bool | ||
ConvertBool bool | ||
UseRegex bool | ||
SourceOverride []string | ||
StringToNumber map[string][]map[string]float64 | ||
|
||
sender wavefront.Sender | ||
} | ||
|
||
// catch many of the invalid chars that could appear in a metric or tag name | ||
|
@@ -40,43 +40,49 @@ var sanitizedChars = strings.NewReplacer( | |
// instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer | ||
var sanitizedRegex = regexp.MustCompile("[^a-zA-Z\\d_.-]") | ||
|
||
var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-") | ||
var tagValueReplacer = strings.NewReplacer("*", "-") | ||
|
||
var pathReplacer = strings.NewReplacer("_", "_") | ||
|
||
var sampleConfig = ` | ||
## DNS name of the wavefront proxy server | ||
host = "wavefront.example.com" | ||
## Url for Wavefront Direct Ingestion or using HTTP with Wavefront Proxy | ||
## If using Wavefront Proxy, also specify port. example: http://proxyserver:2878 | ||
url = "https://metrics.wavefront.com" | ||
|
||
## Authentication Token for Wavefront. Only required if using Direct Ingestion | ||
#token = "DUMMY_TOKEN" | ||
|
||
## DNS name of the wavefront proxy server. Do not use if url is specified | ||
#host = "wavefront.example.com" | ||
danielnelson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
## Port that the Wavefront proxy server listens on | ||
port = 2878 | ||
## Port that the Wavefront proxy server listens on. Do not use if url is specified | ||
#port = 2878 | ||
|
||
## prefix for metrics keys | ||
#prefix = "my.specific.prefix." | ||
|
||
## whether to use "value" for name of simple fields | ||
## whether to use "value" for name of simple fields. default is false | ||
#simple_fields = false | ||
|
||
## character to use between metric and field name. defaults to . (dot) | ||
## character to use between metric and field name. default is . (dot) | ||
#metric_separator = "." | ||
|
||
## Convert metric name paths to use metricSeperator character | ||
## When true (default) will convert all _ (underscore) chartacters in final metric name | ||
## Convert metric name paths to use metricSeparator character | ||
## When true will convert all _ (underscore) characters in final metric name. default is true | ||
#convert_paths = true | ||
|
||
## Use Regex to sanitize metric and tag names from invalid characters | ||
## Regex is more thorough, but significantly slower | ||
## Regex is more thorough, but significantly slower. default is false | ||
#use_regex = false | ||
|
||
## point tags to use as the source name for Wavefront (if none found, host will be used) | ||
#source_override = ["hostname", "agent_host", "node_host"] | ||
#source_override = ["hostname", "address", "agent_host", "node_host"] | ||
|
||
## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default true | ||
## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true | ||
#convert_bool = true | ||
|
||
## Define a mapping, namespaced by metric prefix, from string values to numeric values | ||
## The example below maps "green" -> 1.0, "yellow" -> 0.5, "red" -> 0.0 for | ||
## any metrics beginning with "elasticsearch" | ||
## deprecated in 1.9; use the enum processor plugin | ||
#[[outputs.wavefront.string_to_number.elasticsearch]] | ||
# green = 1.0 | ||
# yellow = 0.5 | ||
|
@@ -92,44 +98,51 @@ type MetricPoint struct { | |
} | ||
|
||
func (w *Wavefront) Connect() error { | ||
|
||
if len(w.StringToNumber) > 0 { | ||
log.Print("W! [outputs.wavefront] The string_to_number option is deprecated; please use the enum processor instead") | ||
} | ||
|
||
if w.Url != "" { | ||
log.Printf("D! [outputs.wavefront] connecting over http/https using Url: %s", w.Url) | ||
sender, err := wavefront.NewDirectSender(&wavefront.DirectConfiguration{ | ||
Server: w.Url, | ||
Token: w.Token, | ||
FlushIntervalSeconds: 5, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Telegraf considers returning from Write() without error to be successful delivery of metrics, but with the extra layer of buffering we don't actually know if the metric was sent. There may be some reduction in durability because of this, might be worth considering a sender implementation that sends immediately without buffering? |
||
}) | ||
if err != nil { | ||
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Url: %s", w.Url) | ||
} | ||
w.sender = sender | ||
} else { | ||
log.Printf("D! Output [wavefront] connecting over tcp using Host: %s and Port: %d", w.Host, w.Port) | ||
sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{ | ||
Host: w.Host, | ||
MetricsPort: w.Port, | ||
FlushIntervalSeconds: 5, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %s and Port: %d", w.Host, w.Port) | ||
} | ||
w.sender = sender | ||
} | ||
|
||
if w.ConvertPaths && w.MetricSeparator == "_" { | ||
w.ConvertPaths = false | ||
} | ||
if w.ConvertPaths { | ||
pathReplacer = strings.NewReplacer("_", w.MetricSeparator) | ||
} | ||
|
||
// Test Connection to Wavefront proxy Server | ||
uri := fmt.Sprintf("%s:%d", w.Host, w.Port) | ||
_, err := net.ResolveTCPAddr("tcp", uri) | ||
if err != nil { | ||
return fmt.Errorf("Wavefront: TCP address cannot be resolved %s", err.Error()) | ||
} | ||
connection, err := net.Dial("tcp", uri) | ||
if err != nil { | ||
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) | ||
} | ||
defer connection.Close() | ||
return nil | ||
} | ||
|
||
func (w *Wavefront) Write(metrics []telegraf.Metric) error { | ||
|
||
// Send Data to Wavefront proxy Server | ||
uri := fmt.Sprintf("%s:%d", w.Host, w.Port) | ||
connection, err := net.Dial("tcp", uri) | ||
if err != nil { | ||
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) | ||
} | ||
defer connection.Close() | ||
connection.SetWriteDeadline(time.Now().Add(5 * time.Second)) | ||
|
||
for _, m := range metrics { | ||
for _, metricPoint := range buildMetrics(m, w) { | ||
metricLine := formatMetricPoint(metricPoint, w) | ||
_, err := connection.Write([]byte(metricLine)) | ||
for _, point := range buildMetrics(m, w) { | ||
err := w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags) | ||
if err != nil { | ||
return fmt.Errorf("Wavefront: TCP writing error %s", err.Error()) | ||
return fmt.Errorf("Wavefront sending error: %s", err.Error()) | ||
} | ||
} | ||
} | ||
|
@@ -165,7 +178,7 @@ func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint { | |
|
||
metricValue, buildError := buildValue(value, metric.Metric, w) | ||
if buildError != nil { | ||
log.Printf("D! Output [wavefront] %s\n", buildError.Error()) | ||
log.Printf("D! [outputs.wavefront] %s\n", buildError.Error()) | ||
continue | ||
} | ||
metric.Value = metricValue | ||
|
@@ -188,8 +201,8 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string | |
} | ||
} | ||
|
||
// find source, use source_override property if needed | ||
var source string | ||
|
||
if s, ok := mTags["source"]; ok { | ||
source = s | ||
delete(mTags, "source") | ||
|
@@ -214,10 +227,25 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string | |
source = mTags["host"] | ||
} | ||
} | ||
source = tagValueReplacer.Replace(source) | ||
|
||
// remove default host tag | ||
delete(mTags, "host") | ||
|
||
return tagValueReplacer.Replace(source), mTags | ||
// sanitize tag keys and values | ||
tags := make(map[string]string) | ||
for k, v := range mTags { | ||
var key string | ||
if w.UseRegex { | ||
key = sanitizedRegex.ReplaceAllLiteralString(k, "-") | ||
} else { | ||
key = sanitizedChars.Replace(k) | ||
} | ||
val := tagValueReplacer.Replace(v) | ||
tags[key] = val | ||
} | ||
|
||
return source, tags | ||
} | ||
|
||
func buildValue(v interface{}, name string, w *Wavefront) (float64, error) { | ||
|
@@ -255,34 +283,6 @@ func buildValue(v interface{}, name string, w *Wavefront) (float64, error) { | |
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name) | ||
} | ||
|
||
func formatMetricPoint(metricPoint *MetricPoint, w *Wavefront) string { | ||
buffer := bytes.NewBufferString("") | ||
buffer.WriteString(metricPoint.Metric) | ||
buffer.WriteString(" ") | ||
buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64)) | ||
buffer.WriteString(" ") | ||
buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10)) | ||
buffer.WriteString(" source=\"") | ||
buffer.WriteString(metricPoint.Source) | ||
buffer.WriteString("\"") | ||
|
||
for k, v := range metricPoint.Tags { | ||
buffer.WriteString(" ") | ||
if w.UseRegex { | ||
buffer.WriteString(sanitizedRegex.ReplaceAllLiteralString(k, "-")) | ||
} else { | ||
buffer.WriteString(sanitizedChars.Replace(k)) | ||
} | ||
buffer.WriteString("=\"") | ||
buffer.WriteString(tagValueReplacer.Replace(v)) | ||
buffer.WriteString("\"") | ||
} | ||
|
||
buffer.WriteString("\n") | ||
|
||
return buffer.String() | ||
} | ||
|
||
func (w *Wavefront) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
@@ -292,12 +292,14 @@ func (w *Wavefront) Description() string { | |
} | ||
|
||
func (w *Wavefront) Close() error { | ||
w.sender.Close() | ||
return nil | ||
} | ||
|
||
func init() { | ||
outputs.Add("wavefront", func() telegraf.Output { | ||
return &Wavefront{ | ||
Token: "DUMMY_TOKEN", | ||
MetricSeparator: ".", | ||
ConvertPaths: true, | ||
ConvertBool: true, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be some changes to Gopkg.lock as well, which is why the tests aren't working, just need to run
dep ensure
once.