Skip to content

Commit

Permalink
feat: Migrate dropwizard parser to new style (#11371)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Jun 29, 2022
1 parent c9c0b91 commit f8766bc
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 181 deletions.
2 changes: 0 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,8 +1426,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
// Parser options to ignore
case "data_type", "separator", "tag_keys",
// "templates", // shared with serializers
"dropwizard_metric_registry_path", "dropwizard_tags_path", "dropwizard_tag_paths",
"dropwizard_time_format", "dropwizard_time_path",
"grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns",
"grok_timezone", "grok_unique_timestamp",
"influx_parser_type",
Expand Down
1 change: 1 addition & 0 deletions plugins/parsers/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
//Blank imports for plugins to register themselves
_ "github.com/influxdata/telegraf/plugins/parsers/collectd"
_ "github.com/influxdata/telegraf/plugins/parsers/csv"
_ "github.com/influxdata/telegraf/plugins/parsers/dropwizard"
_ "github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
_ "github.com/influxdata/telegraf/plugins/parsers/graphite"
_ "github.com/influxdata/telegraf/plugins/parsers/json"
Expand Down
137 changes: 60 additions & 77 deletions plugins/parsers/dropwizard/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,32 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/templating"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
)

type TimeFunc func() time.Time

// Parser parses json inputs containing dropwizard metrics,
// either top-level or embedded inside a json field.
// This parser is using gjson for retrieving paths within the json file.
type parser struct {
// an optional json path containing the metric registry object
// if left empty, the whole json object is parsed as a metric registry
MetricRegistryPath string

// an optional json path containing the default time of the metrics
// if left empty, or if cannot be parsed the current processing time is used as the time of the metrics
TimePath string

// time format to use for parsing the time field
// defaults to time.RFC3339
TimeFormat string

// an optional json path pointing to a json object with tag key/value pairs
// takes precedence over TagPathsMap
TagsPath string

// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags
TagPathsMap map[string]string
type Parser struct {
MetricRegistryPath string `toml:"dropwizard_metric_registry_path"`
TimePath string `toml:"dropwizard_time_path"`
TimeFormat string `toml:"dropwizard_time_format"`
TagsPath string `toml:"dropwizard_tags_path"`
TagPathsMap map[string]string `toml:"dropwizard_tag_paths_map"`
Separator string `toml:"separator"`
Templates []string `toml:"templates"`
DefaultTags map[string]string `toml:"-"`
Log telegraf.Logger `toml:"-"`

// an optional map of default tags to use for metrics
DefaultTags map[string]string

Log telegraf.Logger `toml:"-"`

separator string
templateEngine *templating.Engine

timeFunc TimeFunc

// seriesParser parses line protocol measurement + tags
seriesParser *influx.Parser
}

func NewParser() *parser {
handler := influx.NewMetricHandler()
seriesParser := influx.NewSeriesParser(handler)

parser := &parser{
timeFunc: time.Now,
seriesParser: seriesParser,
}
return parser
}

// Parse parses the input bytes to an array of metrics
func (p *parser) Parse(buf []byte) ([]telegraf.Metric, error) {
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)

metricTime, err := p.parseTime(buf)
Expand Down Expand Up @@ -111,38 +81,17 @@ func (p *parser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}

func (p *parser) SetTemplates(separator string, templates []string) error {
if len(templates) == 0 {
p.templateEngine = nil
return nil
}

defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*")
if err != nil {
return err
}

templateEngine, err := templating.NewEngine(separator, defaultTemplate, templates)
if err != nil {
return err
}

p.separator = separator
p.templateEngine = templateEngine
return nil
}

// ParseLine is not supported by the dropwizard format
func (p *parser) ParseLine(line string) (telegraf.Metric, error) {
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
return nil, fmt.Errorf("ParseLine not supported: %s, for data format: dropwizard", line)
}

// SetDefaultTags sets the default tags
func (p *parser) SetDefaultTags(tags map[string]string) {
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

func (p *parser) readTags(buf []byte) map[string]string {
func (p *Parser) readTags(buf []byte) map[string]string {
if p.TagsPath != "" {
var tagsBytes []byte
tagsResult := gjson.GetBytes(buf, p.TagsPath)
Expand All @@ -167,28 +116,26 @@ func (p *parser) readTags(buf []byte) map[string]string {
return tags
}

func (p *parser) parseTime(buf []byte) (time.Time, error) {
func (p *Parser) parseTime(buf []byte) (time.Time, error) {
if p.TimePath != "" {
timeFormat := p.TimeFormat
if timeFormat == "" {
timeFormat = time.RFC3339
}
timeString := gjson.GetBytes(buf, p.TimePath).String()
if timeString == "" {
err := fmt.Errorf("time not found in JSON path %s", p.TimePath)
return p.timeFunc(), err
return time.Time{}, fmt.Errorf("time not found in JSON path %s", p.TimePath)
}
t, err := time.Parse(timeFormat, timeString)
if err != nil {
err = fmt.Errorf("time %s cannot be parsed with format %s, %s", timeString, timeFormat, err)
return p.timeFunc(), err
return time.Time{}, fmt.Errorf("time %s cannot be parsed with format %s, %s", timeString, timeFormat, err)
}
return t.UTC(), nil
}
return p.timeFunc(), nil
return time.Now(), nil
}

func (p *parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
func (p *Parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
var registryBytes []byte
if p.MetricRegistryPath != "" {
regResult := gjson.GetBytes(buf, p.MetricRegistryPath)
Expand All @@ -213,7 +160,7 @@ func (p *parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
return jsonOut, nil
}

func (p *parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric {
func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, tm time.Time) []telegraf.Metric {
if dwmsTyped, ok := dwms.(map[string]interface{}); ok {
for dwmName, dwmFields := range dwmsTyped {
measurementName := dwmName
Expand All @@ -222,7 +169,7 @@ func (p *parser) readDWMetrics(metricType string, dwms interface{}, metrics []te
if p.templateEngine != nil {
measurementName, tags, fieldPrefix, _ = p.templateEngine.Apply(dwmName)
if len(fieldPrefix) > 0 {
fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.separator)
fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.Separator)
}
}

Expand Down Expand Up @@ -258,6 +205,42 @@ func (p *parser) readDWMetrics(metricType string, dwms interface{}, metrics []te
return metrics
}

func (p *parser) SetTimeFunc(f TimeFunc) {
p.timeFunc = f
func (p *Parser) Init() error {
handler := influx.NewMetricHandler()
p.seriesParser = influx.NewSeriesParser(handler)

if len(p.Templates) != 0 {
defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*")
if err != nil {
return err
}

templateEngine, err := templating.NewEngine(p.Separator, defaultTemplate, p.Templates)
if err != nil {
return err
}
p.templateEngine = templateEngine
}

return nil
}

func init() {
parsers.Add("dropwizard",
func(defaultMetricName string) telegraf.Parser {
return &Parser{}
})
}

func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.MetricRegistryPath = config.DropwizardMetricRegistryPath
p.TimePath = config.DropwizardTimePath
p.TimeFormat = config.DropwizardTimeFormat
p.TagsPath = config.DropwizardTagsPath
p.TagPathsMap = config.DropwizardTagPathsMap
p.Separator = config.Separator
p.Templates = append(p.Templates, config.Templates...)
p.DefaultTags = config.DefaultTags

return nil
}
Loading

0 comments on commit f8766bc

Please sign in to comment.