diff --git a/plugins/parsers/avro/README.md b/plugins/parsers/avro/README.md index c071cbb06fc9f..1444208e439bd 100644 --- a/plugins/parsers/avro/README.md +++ b/plugins/parsers/avro/README.md @@ -108,7 +108,8 @@ The message is supposed to be encoded as follows: ## associate that field with the first type it sees for a given ## its value. If it receives another measurement with a different ## type in that field, it will discard that entire measurement. Be - ## sure you know what you're doing if you use the "any" type. + ## sure you know what you're doing if you use the "any" type, or + ## "nullable" with more than one non-null type. # avro_union_mode = "flatten" ## Default values for given tags: optional diff --git a/plugins/parsers/avro/parser.go b/plugins/parsers/avro/parser.go index cab68f20260c1..792f252d3d6e9 100644 --- a/plugins/parsers/avro/parser.go +++ b/plugins/parsers/avro/parser.go @@ -162,9 +162,9 @@ func (p *Parser) SetDefaultTags(tags map[string]string) { } func (p *Parser) flattenField(fldName string, fldVal map[string]interface{}) map[string]interface{} { + // Helper function for the "nullable" and "any" p.UnionModes // fldVal is a one-item map of string-to-something ret := make(map[string]interface{}) - // Helper function for the "nullable" and "any" p.UnionModes if p.UnionMode == "nullable" { _, ok := fldVal["null"] if ok { @@ -172,13 +172,7 @@ func (p *Parser) flattenField(fldName string, fldVal map[string]interface{}) map } } // Otherwise, we just return the value in the fieldname. - // - // WARNING: if you once write a measurement to InfluxDB, and you - // later give it a measurement which has a field with the same - // name but a different type, InfluxDB will reject the entire second - // measurement. Be sure you know what you're doing if you turn - // on "any", or you use "nullable" but there's more than one non-null - // type that your measurement can be. + // See README.md for an important warning about "any" and "nullable". for _, v := range fldVal { ret[fldName] = v break // Not really needed, since it's a one-item map @@ -259,7 +253,6 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg fields[k] = v } } - var schemaObj map[string]interface{} if err := json.Unmarshal([]byte(schema), &schemaObj); err != nil { return nil, fmt.Errorf("unmarshaling schema failed: %w", err) @@ -307,8 +300,7 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg } else { timestamp = time.Now() } - m := metric.New(name, tags, fields, timestamp) - return m, nil + return metric.New(name, tags, fields, timestamp), nil } func init() { diff --git a/plugins/parsers/avro/schema_registry.go b/plugins/parsers/avro/schema_registry.go index 45e725d19a2b7..bbb467dd7f81e 100644 --- a/plugins/parsers/avro/schema_registry.go +++ b/plugins/parsers/avro/schema_registry.go @@ -29,14 +29,13 @@ type schemaRegistry struct { const schemaByID = "%s/schemas/ids/%d" func newSchemaRegistry(addr string, caCertPath string) (*schemaRegistry, error) { - caCert, err := os.ReadFile(caCertPath) - if err != nil { - return nil, err - } - var client *http.Client var tlsCfg *tls.Config if caCertPath != "" { + caCert, err := os.ReadFile(caCertPath) + if err != nil { + return nil, err + } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) tlsCfg = &tls.Config{ diff --git a/plugins/parsers/avro/testdata/union-array/expected.out b/plugins/parsers/avro/testdata/union-array/expected.out new file mode 100644 index 0000000000000..7651a0067d667 --- /dev/null +++ b/plugins/parsers/avro/testdata/union-array/expected.out @@ -0,0 +1 @@ +array,name=pi data_0=3,data_1=3.0999999046325684,data_2=3.140000104904175,data_3=3.1410000324249268 1682509200092000 diff --git a/plugins/parsers/avro/testdata/union-array/message.json b/plugins/parsers/avro/testdata/union-array/message.json new file mode 100644 index 0000000000000..b1c265ca46c2e --- /dev/null +++ b/plugins/parsers/avro/testdata/union-array/message.json @@ -0,0 +1,5 @@ +{ + "statistics_collection_time": 1682509200092, + "data": [ 3, 3.1, 3.14, 3.141 ], + "name": "pi" +} diff --git a/plugins/parsers/avro/testdata/union-array/telegraf.conf b/plugins/parsers/avro/testdata/union-array/telegraf.conf new file mode 100644 index 0000000000000..9d6c42f2e6c75 --- /dev/null +++ b/plugins/parsers/avro/testdata/union-array/telegraf.conf @@ -0,0 +1,25 @@ +[[ inputs.file ]] + files = ["./testdata/json-array/message.json"] + data_format = "avro" + + avro_format = "json" + avro_measurement = "array" + avro_tags = ["name"] + avro_timestamp = "statistics_collection_time" + avro_timestamp_format = "unix_ms" + avro_fields = ["data"] + avro_union_mode = "any" + avro_field_separator = "_" + avro_schema = ''' + { + "namespace": "constants", + "name": "classical", + "type": "record", + "version": "1", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "data", "type": "array", "items": "float"}, + {"name": "statistics_collection_time", "type": "long"} + ] + } + '''