Skip to content

Commit

Permalink
Parse statsd lines with multiple metric bits
Browse files Browse the repository at this point in the history
  • Loading branch information
MerlinDMC committed Nov 30, 2015
1 parent b705608 commit e9fa7b4
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 79 deletions.
20 changes: 20 additions & 0 deletions plugins/statsd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ implementation. In short, the telegraf statsd listener will accept:
- `load.time.nanoseconds:1|h`
- `load.time:200|ms|@0.1` <- sampled 1/10 of the time

It is possible to omit repetitive names and merge individual stats into a
single line by separating them with additional colons:

- `users.current.den001.myapp:32|g:+10|g:-10|g`
- `deploys.test.myservice:1|c:101|c:1|c|@0.1`
- `users.unique:101|s:101|s:102|s`
- `load.time:320|ms:200|ms|@0.1`

This also allows for mixed types in a single line:

- `foo:1|c:200|ms`

The internals for this do parse out the metric name and repeativly use it to
create new packets for each bit in the line that can be separated by colon.
The parser will copy down the metric name to each bit until a newline is found.

The string `foo:1|c:200|ms` is internally split into two individual metrics
`foo:1|c` and `foo:200|ms` which are added to the aggregator separately.


#### Influx Statsd

In order to take advantage of InfluxDB's tagging system, we have made a couple
Expand Down
166 changes: 87 additions & 79 deletions plugins/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,101 +254,109 @@ func (s *Statsd) parseStatsdLine(line string) error {
s.Lock()
defer s.Unlock()

m := metric{}

// Validate splitting the line on "|"
pipesplit := strings.Split(line, "|")
if len(pipesplit) < 2 {
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
// Validate splitting the line on ":"
bits := strings.Split(line, ":")
if len(bits) < 2 {
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
} else if len(pipesplit) > 2 {
sr := pipesplit[2]
errmsg := "Error: parsing sample rate, %s, it must be in format like: " +
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
if strings.Contains(sr, "@") && len(sr) > 1 {
samplerate, err := strconv.ParseFloat(sr[1:], 64)
if err != nil {
log.Printf(errmsg, err.Error(), line)
} else {
// sample rate successfully parsed
m.samplerate = samplerate
}
} else {
log.Printf(errmsg, "", line)
}
}

// Validate metric type
switch pipesplit[1] {
case "g", "c", "s", "ms", "h":
m.mtype = pipesplit[1]
default:
log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1])
return errors.New("Error Parsing statsd line")
}
// Extract bucket name from individual metric bits
bucketName, bits := bits[0], bits[1:]

// Validate splitting the rest of the line on ":"
colonsplit := strings.Split(pipesplit[0], ":")
if len(colonsplit) != 2 {
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.bucket = colonsplit[0]
// Add a metric for each bit available
for _, bit := range bits {
m := metric{}

m.bucket = bucketName

// Parse the value
if strings.ContainsAny(colonsplit[1], "-+") {
if m.mtype != "g" {
log.Printf("Error: +- values are only supported for gauges: %s\n", line)
// Validate splitting the bit on "|"
pipesplit := strings.Split(bit, "|")
if len(pipesplit) < 2 {
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
} else if len(pipesplit) > 2 {
sr := pipesplit[2]
errmsg := "Error: parsing sample rate, %s, it must be in format like: " +
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
if strings.Contains(sr, "@") && len(sr) > 1 {
samplerate, err := strconv.ParseFloat(sr[1:], 64)
if err != nil {
log.Printf(errmsg, err.Error(), line)
} else {
// sample rate successfully parsed
m.samplerate = samplerate
}
} else {
log.Printf(errmsg, "", line)
}
}
m.additive = true
}

switch m.mtype {
case "g", "ms", "h":
v, err := strconv.ParseFloat(colonsplit[1], 64)
if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line)
// Validate metric type
switch pipesplit[1] {
case "g", "c", "s", "ms", "h":
m.mtype = pipesplit[1]
default:
log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1])
return errors.New("Error Parsing statsd line")
}
m.floatvalue = v
case "c", "s":
v, err := strconv.ParseInt(colonsplit[1], 10, 64)
if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line")

// Parse the value
if strings.ContainsAny(pipesplit[0], "-+") {
if m.mtype != "g" {
log.Printf("Error: +- values are only supported for gauges: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.additive = true
}

switch m.mtype {
case "g", "ms", "h":
v, err := strconv.ParseFloat(pipesplit[0], 64)
if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.floatvalue = v
case "c", "s":
v, err := strconv.ParseInt(pipesplit[0], 10, 64)
if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)
}
m.intvalue = v
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)

// Parse the name & tags from bucket
m.name, m.tags = s.parseName(m.bucket)
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
case "g":
m.tags["metric_type"] = "gauge"
case "s":
m.tags["metric_type"] = "set"
case "ms":
m.tags["metric_type"] = "timing"
case "h":
m.tags["metric_type"] = "histogram"
}
m.intvalue = v
}

// Parse the name & tags from bucket
m.name, m.tags = s.parseName(m.bucket)
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
case "g":
m.tags["metric_type"] = "gauge"
case "s":
m.tags["metric_type"] = "set"
case "ms":
m.tags["metric_type"] = "timing"
case "h":
m.tags["metric_type"] = "histogram"
}
// Make a unique key for the measurement name/tags
var tg []string
for k, v := range m.tags {
tg = append(tg, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(tg)
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)

// Make a unique key for the measurement name/tags
var tg []string
for k, v := range m.tags {
tg = append(tg, fmt.Sprintf("%s=%s", k, v))
s.aggregate(m)
}
sort.Strings(tg)
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)

s.aggregate(m)
return nil
}

Expand Down
130 changes: 130 additions & 0 deletions plugins/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,136 @@ func TestParse_MeasurementsWithSameName(t *testing.T) {
}
}

// Test that measurements with multiple bits, are treated as different outputs
// but are equal to their single-measurement representation
func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
single_lines := []string{
"valid.multiple:0|ms|@0.1",
"valid.multiple:0|ms|",
"valid.multiple:1|ms",
"valid.multiple.duplicate:1|c",
"valid.multiple.duplicate:1|c",
"valid.multiple.duplicate:2|c",
"valid.multiple.duplicate:1|c",
"valid.multiple.duplicate:1|h",
"valid.multiple.duplicate:1|h",
"valid.multiple.duplicate:2|h",
"valid.multiple.duplicate:1|h",
"valid.multiple.duplicate:1|s",
"valid.multiple.duplicate:1|s",
"valid.multiple.duplicate:2|s",
"valid.multiple.duplicate:1|s",
"valid.multiple.duplicate:1|g",
"valid.multiple.duplicate:1|g",
"valid.multiple.duplicate:2|g",
"valid.multiple.duplicate:1|g",
"valid.multiple.mixed:1|c",
"valid.multiple.mixed:1|ms",
"valid.multiple.mixed:2|s",
"valid.multiple.mixed:1|g",
}

multiple_lines := []string{
"valid.multiple:0|ms|@0.1:0|ms|:1|ms",
"valid.multiple.duplicate:1|c:1|c:2|c:1|c",
"valid.multiple.duplicate:1|h:1|h:2|h:1|h",
"valid.multiple.duplicate:1|s:1|s:2|s:1|s",
"valid.multiple.duplicate:1|g:1|g:2|g:1|g",
"valid.multiple.mixed:1|c:1|ms:2|s:1|g",
}

s_single := NewStatsd()
s_multiple := NewStatsd()

for _, line := range single_lines {
err := s_single.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}

for _, line := range multiple_lines {
err := s_multiple.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}

if len(s_single.timings) != 3 {
t.Errorf("Expected 3 measurement, found %d", len(s_single.timings))
}

if cachedtiming, ok := s_single.timings["metric_type=timingvalid_multiple"]; !ok {
t.Errorf("Expected cached measurement with hash 'metric_type=timingvalid_multiple' not found")
} else {
if cachedtiming.name != "valid_multiple" {
t.Errorf("Expected the name to be 'valid_multiple', got %s", cachedtiming.name)
}

// A 0 at samplerate 0.1 will add 10 values of 0,
// A 0 with invalid samplerate will add a single 0,
// plus the last bit of value 1
// which adds up to 12 individual datapoints to be cached
if cachedtiming.stats.n != 12 {
t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n)
}

if cachedtiming.stats.upper != 1 {
t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper)
}
}

// test if s_single and s_multiple did compute the same stats for valid.multiple.duplicate
if err := test_validate_set("valid_multiple_duplicate", 2, s_single.sets); err != nil {
t.Error(err.Error())
}

if err := test_validate_set("valid_multiple_duplicate", 2, s_multiple.sets); err != nil {
t.Error(err.Error())
}

if err := test_validate_counter("valid_multiple_duplicate", 5, s_single.counters); err != nil {
t.Error(err.Error())
}

if err := test_validate_counter("valid_multiple_duplicate", 5, s_multiple.counters); err != nil {
t.Error(err.Error())
}

if err := test_validate_gauge("valid_multiple_duplicate", 1, s_single.gauges); err != nil {
t.Error(err.Error())
}

if err := test_validate_gauge("valid_multiple_duplicate", 1, s_multiple.gauges); err != nil {
t.Error(err.Error())
}

// test if s_single and s_multiple did compute the same stats for valid.multiple.mixed
if err := test_validate_set("valid_multiple_mixed", 1, s_single.sets); err != nil {
t.Error(err.Error())
}

if err := test_validate_set("valid_multiple_mixed", 1, s_multiple.sets); err != nil {
t.Error(err.Error())
}

if err := test_validate_counter("valid_multiple_mixed", 1, s_single.counters); err != nil {
t.Error(err.Error())
}

if err := test_validate_counter("valid_multiple_mixed", 1, s_multiple.counters); err != nil {
t.Error(err.Error())
}

if err := test_validate_gauge("valid_multiple_mixed", 1, s_single.gauges); err != nil {
t.Error(err.Error())
}

if err := test_validate_gauge("valid_multiple_mixed", 1, s_multiple.gauges); err != nil {
t.Error(err.Error())
}
}

// Valid lines should be parsed and their values should be cached
func TestParse_ValidLines(t *testing.T) {
s := NewStatsd()
Expand Down

0 comments on commit e9fa7b4

Please sign in to comment.