diff --git a/plugins/parsers/avro/parser.go b/plugins/parsers/avro/parser.go index c5e2ce31077ba..f85eee70e491d 100644 --- a/plugins/parsers/avro/parser.go +++ b/plugins/parsers/avro/parser.go @@ -32,8 +32,18 @@ type Parser struct { createMetric func(interface{}, string) (telegraf.Metric, error) } +type metricInput struct { + Name string + Tags map[string]string + Fields map[string]interface{} + Timestamp time.Time +} + func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { schemaID, binaryData, err := p.extractSchemaAndMessage(buf) + if err != nil { + return nil, err + } var schema string var codec *goavro.Codec @@ -92,10 +102,17 @@ func (p *Parser) SetDefaultTags(tags map[string]string) { p.DefaultTags = tags } -func (p *Parser) setupMetric(native interface{}, schema string) (string, map[string]string, map[string]interface{}, time.Time, error) { +func (p *Parser) setupMetric(native interface{}, schema string) (metricInput, error) { + badReturn := metricInput{ + Name: "", + Tags: nil, + Fields: nil, + Timestamp: time.Time{}, + } + deep, ok := native.(map[string]interface{}) if !ok { - return "", nil, nil, time.Time{}, fmt.Errorf("cannot cast native interface {} to map[string]interface{}") + return badReturn, fmt.Errorf("cannot cast native interface {} to map[string]interface{}") } timestamp := nestedValue(deep[p.Timestamp]) @@ -105,7 +122,7 @@ func (p *Parser) setupMetric(native interface{}, schema string) (string, map[str metricTime, err := internal.ParseTimestamp(p.TimestampFormat, timestamp, "UTC") if err != nil { - return "", nil, nil, time.Time{}, err + return badReturn, err } if p.RoundTimestampTo != "" { @@ -167,11 +184,11 @@ func (p *Parser) setupMetric(native interface{}, schema string) (string, map[str err = json.Unmarshal([]byte(schema), &schemaObj) if err != nil { - return "", nil, nil, time.Time{}, err + return badReturn, err } if len(fields) == 0 { // A telegraf metric needs at least one field. - return "", nil, nil, time.Time{}, fmt.Errorf("number of fields is 0; unable to create metric") + return badReturn, fmt.Errorf("number of fields is 0; unable to create metric") } // Now some fancy stuff to extract the measurement. // If it's set in the configuration, use that. @@ -191,7 +208,7 @@ func (p *Parser) setupMetric(native interface{}, schema string) (string, map[str nStr, ok := schemaObj["name"].(string) if !ok { - return "", nil, nil, time.Time{}, fmt.Errorf("could not determine name from schema %s", schema) + return badReturn, fmt.Errorf("could not determine name from schema %s", schema) } name = nsStr + separator + nStr } @@ -202,21 +219,26 @@ func (p *Parser) setupMetric(native interface{}, schema string) (string, map[str } // Nothing? Give up. if name == "" { - return "", nil, nil, time.Time{}, fmt.Errorf("could not determine measurement name") + return badReturn, fmt.Errorf("could not determine measurement name") } - return name, tags, fields, metricTime, nil + return metricInput{ + Name: name, + Tags: tags, + Fields: fields, + Timestamp: metricTime, + }, nil } func (p *Parser) createScalarMetric(native interface{}, schema string) (telegraf.Metric, error) { - name, tags, fields, metricTime, err := p.setupMetric(native, schema) + m, err := p.setupMetric(native, schema) if err != nil { return nil, err } - return metric.New(name, tags, fields, metricTime), nil + return metric.New(m.Name, m.Tags, m.Fields, m.Timestamp), nil } func (p *Parser) createComplexMetric(native interface{}, schema string) (telegraf.Metric, error) { - name, tags, fields, metricTime, err := p.setupMetric(native, schema) + m, err := p.setupMetric(native, schema) if err != nil { return nil, err } @@ -227,11 +249,11 @@ func (p *Parser) createComplexMetric(native interface{}, schema string) (telegra Middle: p.FieldSeparator, After: "", } - flat, err := flatten.Flatten(fields, "", sep) + flat, err := flatten.Flatten(m.Fields, "", sep) if err != nil { return nil, err } - return metric.New(name, tags, flat, metricTime), nil + return metric.New(m.Name, m.Tags, flat, m.Timestamp), nil } func nestedValue(deep interface{}) interface{} {