Skip to content

Commit

Permalink
Add array union type with mode 'any'
Browse files Browse the repository at this point in the history
  • Loading branch information
athornton committed Sep 18, 2023
1 parent 9005576 commit d5d8d59
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 17 deletions.
3 changes: 2 additions & 1 deletion plugins/parsers/avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ The message is supposed to be encoded as follows:
## associate that field with the first type it sees for a given
## its value. If it receives another measurement with a different
## type in that field, it will discard that entire measurement. Be
## sure you know what you're doing if you use the "any" type.
## sure you know what you're doing if you use the "any" type, or
## "nullable" with more than one non-null type.
# avro_union_mode = "flatten"

## Default values for given tags: optional
Expand Down
14 changes: 3 additions & 11 deletions plugins/parsers/avro/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,17 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
}

func (p *Parser) flattenField(fldName string, fldVal map[string]interface{}) map[string]interface{} {
// Helper function for the "nullable" and "any" p.UnionModes
// fldVal is a one-item map of string-to-something
ret := make(map[string]interface{})
// Helper function for the "nullable" and "any" p.UnionModes
if p.UnionMode == "nullable" {
_, ok := fldVal["null"]
if ok {
return ret // Return the empty map
}
}
// Otherwise, we just return the value in the fieldname.
//
// WARNING: if you once write a measurement to InfluxDB, and you
// later give it a measurement which has a field with the same
// name but a different type, InfluxDB will reject the entire second
// measurement. Be sure you know what you're doing if you turn
// on "any", or you use "nullable" but there's more than one non-null
// type that your measurement can be.
// See README.md for an important warning about "any" and "nullable".
for _, v := range fldVal {
ret[fldName] = v
break // Not really needed, since it's a one-item map
Expand Down Expand Up @@ -259,7 +253,6 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg
fields[k] = v
}
}

var schemaObj map[string]interface{}
if err := json.Unmarshal([]byte(schema), &schemaObj); err != nil {
return nil, fmt.Errorf("unmarshaling schema failed: %w", err)
Expand Down Expand Up @@ -307,8 +300,7 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg
} else {
timestamp = time.Now()
}
m := metric.New(name, tags, fields, timestamp)
return m, nil
return metric.New(name, tags, fields, timestamp), nil
}

func init() {
Expand Down
9 changes: 4 additions & 5 deletions plugins/parsers/avro/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ type schemaRegistry struct {
const schemaByID = "%s/schemas/ids/%d"

func newSchemaRegistry(addr string, caCertPath string) (*schemaRegistry, error) {
caCert, err := os.ReadFile(caCertPath)
if err != nil {
return nil, err
}

var client *http.Client
var tlsCfg *tls.Config
if caCertPath != "" {
caCert, err := os.ReadFile(caCertPath)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsCfg = &tls.Config{
Expand Down
1 change: 1 addition & 0 deletions plugins/parsers/avro/testdata/union-array/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
array,name=pi data_0=3,data_1=3.0999999046325684,data_2=3.140000104904175,data_3=3.1410000324249268 1682509200092000
5 changes: 5 additions & 0 deletions plugins/parsers/avro/testdata/union-array/message.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"statistics_collection_time": 1682509200092,
"data": [ 3, 3.1, 3.14, 3.141 ],
"name": "pi"
}
25 changes: 25 additions & 0 deletions plugins/parsers/avro/testdata/union-array/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[[ inputs.file ]]
files = ["./testdata/json-array/message.json"]
data_format = "avro"

avro_format = "json"
avro_measurement = "array"
avro_tags = ["name"]
avro_timestamp = "statistics_collection_time"
avro_timestamp_format = "unix_ms"
avro_fields = ["data"]
avro_union_mode = "any"
avro_field_separator = "_"
avro_schema = '''
{
"namespace": "constants",
"name": "classical",
"type": "record",
"version": "1",
"fields": [
{"name": "name", "type": "string"},
{"name": "data", "type": "array", "items": "float"},
{"name": "statistics_collection_time", "type": "long"}
]
}
'''

0 comments on commit d5d8d59

Please sign in to comment.