Skip to content

Commit

Permalink
Telegraf 1.13.1
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and athornton committed Sep 28, 2022
1 parent 06996ef commit b139e0e
Showing 1 changed file with 35 additions and 13 deletions.
48 changes: 35 additions & 13 deletions plugins/parsers/avro/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,18 @@ type Parser struct {
createMetric func(interface{}, string) (telegraf.Metric, error)
}

type metricInput struct {
Name string
Tags map[string]string
Fields map[string]interface{}
Timestamp time.Time
}

func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
schemaID, binaryData, err := p.extractSchemaAndMessage(buf)
if err != nil {
return nil, err
}

var schema string
var codec *goavro.Codec
Expand Down Expand Up @@ -92,10 +102,17 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

func (p *Parser) setupMetric(native interface{}, schema string) (string, map[string]string, map[string]interface{}, time.Time, error) {
func (p *Parser) setupMetric(native interface{}, schema string) (metricInput, error) {
badReturn := metricInput{
Name: "",
Tags: nil,
Fields: nil,
Timestamp: time.Time{},
}

deep, ok := native.(map[string]interface{})
if !ok {
return "", nil, nil, time.Time{}, fmt.Errorf("cannot cast native interface {} to map[string]interface{}")
return badReturn, fmt.Errorf("cannot cast native interface {} to map[string]interface{}")
}

timestamp := nestedValue(deep[p.Timestamp])
Expand All @@ -105,7 +122,7 @@ func (p *Parser) setupMetric(native interface{}, schema string) (string, map[str

metricTime, err := internal.ParseTimestamp(p.TimestampFormat, timestamp, "UTC")
if err != nil {
return "", nil, nil, time.Time{}, err
return badReturn, err
}

if p.RoundTimestampTo != "" {
Expand Down Expand Up @@ -167,11 +184,11 @@ func (p *Parser) setupMetric(native interface{}, schema string) (string, map[str

err = json.Unmarshal([]byte(schema), &schemaObj)
if err != nil {
return "", nil, nil, time.Time{}, err
return badReturn, err
}
if len(fields) == 0 {
// A telegraf metric needs at least one field.
return "", nil, nil, time.Time{}, fmt.Errorf("number of fields is 0; unable to create metric")
return badReturn, fmt.Errorf("number of fields is 0; unable to create metric")
}
// Now some fancy stuff to extract the measurement.
// If it's set in the configuration, use that.
Expand All @@ -191,7 +208,7 @@ func (p *Parser) setupMetric(native interface{}, schema string) (string, map[str

nStr, ok := schemaObj["name"].(string)
if !ok {
return "", nil, nil, time.Time{}, fmt.Errorf("could not determine name from schema %s", schema)
return badReturn, fmt.Errorf("could not determine name from schema %s", schema)
}
name = nsStr + separator + nStr
}
Expand All @@ -202,21 +219,26 @@ func (p *Parser) setupMetric(native interface{}, schema string) (string, map[str
}
// Nothing? Give up.
if name == "" {
return "", nil, nil, time.Time{}, fmt.Errorf("could not determine measurement name")
return badReturn, fmt.Errorf("could not determine measurement name")
}
return name, tags, fields, metricTime, nil
return metricInput{
Name: name,
Tags: tags,
Fields: fields,
Timestamp: metricTime,
}, nil
}

func (p *Parser) createScalarMetric(native interface{}, schema string) (telegraf.Metric, error) {
name, tags, fields, metricTime, err := p.setupMetric(native, schema)
m, err := p.setupMetric(native, schema)
if err != nil {
return nil, err
}
return metric.New(name, tags, fields, metricTime), nil
return metric.New(m.Name, m.Tags, m.Fields, m.Timestamp), nil
}

func (p *Parser) createComplexMetric(native interface{}, schema string) (telegraf.Metric, error) {
name, tags, fields, metricTime, err := p.setupMetric(native, schema)
m, err := p.setupMetric(native, schema)
if err != nil {
return nil, err
}
Expand All @@ -227,11 +249,11 @@ func (p *Parser) createComplexMetric(native interface{}, schema string) (telegra
Middle: p.FieldSeparator,
After: "",
}
flat, err := flatten.Flatten(fields, "", sep)
flat, err := flatten.Flatten(m.Fields, "", sep)
if err != nil {
return nil, err
}
return metric.New(name, tags, flat, metricTime), nil
return metric.New(m.Name, m.Tags, flat, m.Timestamp), nil
}

func nestedValue(deep interface{}) interface{} {
Expand Down

0 comments on commit b139e0e

Please sign in to comment.