Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Graphite line protocol listen, command and tail plugins support... #627

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0
gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
2 changes: 1 addition & 1 deletion Godeps_windows
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ golang.org/x/text 6fc2e00a0d64b1f7fc1212dae5b0c939cf6d9ac4
gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ Currently implemented sources:
* docker
* elasticsearch
* exec (generic JSON-emitting executable plugin)
* execline (generic Graphite-line-protocol-emitting executable plugin)
* graphite (Graphite line protocol listen service)
* tail (Plugin to tail the files to process graphite line protocol contents)
* haproxy
* httpjson (generic JSON-emitting http service plugin)
* influxdb
Expand Down
14 changes: 14 additions & 0 deletions internal/encoding/graphite/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package graphite

import "fmt"

// An UnsupposedValueError is returned when a parsed value is not
// supposed.
type UnsupposedValueError struct {
Field string
Value float64
}

func (err *UnsupposedValueError) Error() string {
return fmt.Sprintf(`field "%s" value: "%v" is unsupported`, err.Field, err.Value)
}
161 changes: 161 additions & 0 deletions internal/encoding/graphite/innerconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package graphite

import (
"fmt"
"strings"

"github.com/influxdata/influxdb/models"
)

const (
// DefaultSeparator is the default join character to use when joining multiple
// measurment parts in a template.
DefaultSeparator = "."
)

// Config represents the configuration for Graphite endpoints.
type InnerConfig struct {
Separator string
Tags []string
Templates []string
}

// DefaultTags returns the config's tags.
func (c *InnerConfig) DefaultTags() models.Tags {
tags := models.Tags{}
for _, t := range c.Tags {
parts := strings.Split(t, "=")
tags[parts[0]] = parts[1]
}
return tags
}

// Validate validates the config's templates and tags.
func (c *InnerConfig) Validate() error {
if err := c.validateTemplates(); err != nil {
return err
}

if err := c.validateTags(); err != nil {
return err
}

return nil
}

func (c *InnerConfig) validateTemplates() error {
// map to keep track of filters we see
filters := map[string]struct{}{}

for i, t := range c.Templates {
parts := strings.Fields(t)
// Ensure template string is non-empty
if len(parts) == 0 {
return fmt.Errorf("missing template at position: %d", i)
}
if len(parts) == 1 && parts[0] == "" {
return fmt.Errorf("missing template at position: %d", i)
}

if len(parts) > 3 {
return fmt.Errorf("invalid template format: '%s'", t)
}

template := t
filter := ""
tags := ""
if len(parts) >= 2 {
// We could have <filter> <template> or <template> <tags>. Equals is only allowed in
// tags section.
if strings.Contains(parts[1], "=") {
template = parts[0]
tags = parts[1]
} else {
filter = parts[0]
template = parts[1]
}
}

if len(parts) == 3 {
tags = parts[2]
}

// Validate the template has one and only one measurement
if err := c.validateTemplate(template); err != nil {
return err
}

// Prevent duplicate filters in the config
if _, ok := filters[filter]; ok {
return fmt.Errorf("duplicate filter '%s' found at position: %d", filter, i)
}
filters[filter] = struct{}{}

if filter != "" {
// Validate filter expression is valid
if err := c.validateFilter(filter); err != nil {
return err
}
}

if tags != "" {
// Validate tags
for _, tagStr := range strings.Split(tags, ",") {
if err := c.validateTag(tagStr); err != nil {
return err
}
}
}
}
return nil
}

func (c *InnerConfig) validateTags() error {
for _, t := range c.Tags {
if err := c.validateTag(t); err != nil {
return err
}
}
return nil
}

func (c *InnerConfig) validateTemplate(template string) error {
hasMeasurement := false
for _, p := range strings.Split(template, ".") {
if p == "measurement" || p == "measurement*" {
hasMeasurement = true
}
}

if !hasMeasurement {
return fmt.Errorf("no measurement in template `%s`", template)
}

return nil
}

func (c *InnerConfig) validateFilter(filter string) error {
for _, p := range strings.Split(filter, ".") {
if p == "" {
return fmt.Errorf("filter contains blank section: %s", filter)
}

if strings.Contains(p, "*") && p != "*" {
return fmt.Errorf("invalid filter wildcard section: %s", filter)
}
}
return nil
}

func (c *InnerConfig) validateTag(keyValue string) error {
parts := strings.Split(keyValue, "=")
if len(parts) != 2 {
return fmt.Errorf("invalid template tags: '%s'", keyValue)
}

if parts[0] == "" || parts[1] == "" {
return fmt.Errorf("invalid template tags: %s'", keyValue)
}

return nil
}
Loading