Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro processor doesnt support Unions #12970

Closed
pranavnag18 opened this issue Mar 28, 2023 · 18 comments · Fixed by #13945
Closed

Avro processor doesnt support Unions #12970

pranavnag18 opened this issue Mar 28, 2023 · 18 comments · Fixed by #13945
Labels
feature request Requests for new plugin and for new features to existing plugins

Comments

@pranavnag18
Copy link

Use Case

Telegraf Avro processor doesnt support unions, here is the sample schema and encoded string.

schemaJSON := `
	{
		"type": "record",
		"name": "test",
		"fields": [
			{"name": "field1", "type": "int"},
			{"name": "field2", "type": ["null", "string"]}
		]
	}`

// Example Avro message with a union type
messageBytes := []byte("\x02\x0Ahello")

Expected behavior

Avro processor to support unions

Actual behavior

Avro processor failed with error " Could not convert map[string:abc] to string for tag "field2": type "map[string]interface {}" unsupported".

https://github.com/influxdata/telegraf/blob/master/plugins/parsers/avro/parser.go#L143

the conversion should handle type map[string]interface{}.

Additional info

#11816

@pranavnag18 pranavnag18 added the feature request Requests for new plugin and for new features to existing plugins label Mar 28, 2023
@powersj
Copy link
Contributor

powersj commented Mar 28, 2023

Hi,

Hmm not sure we support any of the complex types of Avro.

How do unions work? Would we need to go through the types in the map and attempt to apply those to the value?

@athornton thoughts on this?

Thanks!

@powersj powersj added the waiting for response waiting for response from contributor label Mar 28, 2023
@athornton
Copy link
Contributor

I haven't really given the matter any thought--none of our schemas use unions, and I never have looked at them. The example given in the issue is for what mypy would call Optional[str], that is, something that can either be a string or be a null value.

I feel like we could handle them like we do with the field separator for unpacking array types, and have something like

fieldname_string, fieldname_null

But fieldname_null would always have the nil value so I'm not sure if that would help. Definitely something that needs some thought. It's definitely something the underlying parser supports: see https://github.com/linkedin/goavro/blob/master/union.go

I need to understand a) how this works on the Avro side, and then b) what the right way of representing this as a line of Telegraf data would be.

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label Mar 28, 2023
@BelousovAntonV
Copy link
Contributor

Hi, we have a field Value that could be float or null. Specifying the avro union in schema like "type":["null", "float"] gives us unexpected result - the field "Value" renamed to "Value_float". There is a set of metrics and we have 4 schemas for them: for long, float, boolean and string values. We are expecting this set of metrics on the Influx side having the same field "Value", but instead of that we got 4 fields "Value_+{type_name}". Could you please add the avro unions support?

Thank you!

@athornton
Copy link
Contributor

It's not really clear to me how to do this. On the Telegraf side, there's no concept of unions. Is discarding a measurement where the value is null rather than the data type the right thing to do?

@BelousovAntonV
Copy link
Contributor

BelousovAntonV commented May 24, 2023

@athornton, no, discarding a null value is not the correct thing. The union data type means that the value could be any of data type specified in the union.

Let me describe the issue in the details. We are receiving some data from the sensor. It produce integer values, but at some point it could send null value, for example when switching the mode, which is also the point for us. When we loading data to Kafka using NiFi, we use an avro schema in schema registry. If we just specify the integer data type, the null value will be discarded. For that case we use an avro union [null, integer], which means just that the value in the field could be integer OR null. But when we load data from Kafka using Telegraf with the same schema registry, it tries to process the value as an array, what is incorrect.

Here is the information about avro unions: https://avro.apache.org/docs/1.8.0/spec.html#Unions

@powersj
Copy link
Contributor

powersj commented May 24, 2023

The union data type means that the value could be any of data type specified in the union.

This will not work for all outputs. For example, with InfluxDB if a value comes in originally as an int, but then later comes in as a string the database will reject the entire metric. This is true of schema databases outputs as well.

For outputs, like a file, maybe kafka, where data type is not really taken into consideration, I can see how this is a non-issue.

My inclination is that we make this a configuration option, avro_union_mode that does one of the following:

  • skip: ignores these, default, existing behavior
  • first: takes the value of the union if it matches the first declared data type. The first value declared in the union is suppose to be the default datatype.
  • any: takes value if it matches any of the union values.

Thoughts on that?

@BelousovAntonV
Copy link
Contributor

Hi @powersj, the issue according to InfluxDB is true, but should be managed by person integrating it. Of course, if there will be an int and string datatypes in the message InfluxDB will reject it. But in case, when there are null and one more datatype it is going to work with InfluxDB or any SQL db if nullability of the column is set to true.

About options.

  • skip: ignores these, default, existing behavior

Should't parser then create the 2 fields in that case? For the null value too, because at the moment, it creates only field with data type name, for example "Value_long" if separator set as "_".

If you specify the integer default value for integer datatype, then yes. But if the null is default, then "null" datatype will be specified first. How this option should work when an integer value arives but the first data type is "null"?

  • any: takes value if it matches any of the union values.

This should work for our case.

P.S. I've investigated a code a bit and figured out that this behaviour is a 'feature' of the used module.
image

And now just wondering, if the configuration option 'avro_union_fields' with a list of the field names should be added and some logic should be appeared during flattening listed fields (for example, just rename fields back)? And overall, should all avro fields be flatten?

Thank you

@powersj
Copy link
Contributor

powersj commented May 24, 2023

Should't parser then create the 2 fields in that case? For the null value too, because at the moment, it creates only field with data type name, for example "Value_long" if separator set as "_".

No, the parsed field should have only produce one field. The union only says that the value could be of different types.

How this option should work when an integer value arives but the first data type is "null"?

I see that it is common to set "null" as the first to mark a value optional. In which case, maybe this "first" option does not make sense in that scenario.

if the configuration option 'avro_union_fields' with a list of the field names should be added

I see this support as a light switch. Either you want to parse all unions you come across or not. Is there a compelling case for why a user would want to only parse some unions?

@BelousovAntonV
Copy link
Contributor

BelousovAntonV commented May 25, 2023

I see this support as a light switch. Either you want to parse all unions you come across or not. Is there a compelling case for why a user would want to only parse some unions?

I mean to create an option avro_union_fields instead of avro_union_mode, just to mark fields that should be parsed as unions. If some field is not nedeed in the result field set then it could be not speciffied in avro_union_fields list too. Or, maybe the option's name should be avro_fields_to_parse_as_union?

In case, when there are 2 different types in the union, for example integer and string, and this field is not specified in the avro_union_fields option, it will be parsed as usual and we will get Value_string and Value_integer fields at the end, as it works at the current moment. But if it is specified, it will be parsed as you described in option any.

@powersj
Copy link
Contributor

powersj commented May 25, 2023

If some field is not nedeed in the result field set then it could be not speciffied in avro_union_fields list too.

If you do not want some fields, then you should use the field_drop option. See the modifiers docs. Having an option in every plugin to choose what fields we do or do not want is done via these standard options.

it will be parsed as usual and we will get Value_string and Value_integer fields at the end, as it works at the current moment

This is not my understanding of how this works currently. The parser does not know how to handle unions, so you should be getting an error about parsing a union field. Again, we should never be creating two fields from one.

@BelousovAntonV
Copy link
Contributor

BelousovAntonV commented May 25, 2023

Well, in this case avro_union_mode with any mode is OK for me. But, according to the code of goavro codec, it looks like we do not receive nulls, in case if the field has a null value and the "null" datatype, which is incorrect behaviour. Please, correct me if I'm wrong.

@powersj
Copy link
Contributor

powersj commented May 25, 2023

If you are referring to the goavro Union function where it returns nil: that only happens if the name passed in is "null" and the datum, the actual data, is nil (not null) (e.g. goavro.Union("null", nil)).

@athornton
Copy link
Contributor

athornton commented Jun 21, 2023

I've been talking to my colleague @afausti about what the desired behavior is, and we're getting pretty close to being ready to start an implementation. (The reason for this weird-from-the-outside division of labor is that Angelo understands what we're doing with Kafka and Avro way better than I do, but I'm the person with the most Golang experience on our team.)

@athornton
Copy link
Contributor

After some playing with it...

Implementing "any" will be straightforward, but very unhelpful if your telegraf output is influxdb, since you're going to get a field type mismatch as soon as it tries to write a point with a new data type. I guess if we document it like that people can do it at their own risk. I have no sense for how many people use telegraf to write to things that are not InfluxDB.

"first" doesn't really make sense as-is, because the common case (and the one I now need to support for our internal usage) is ["null","<real-type>"] and I think that should be called something like optional or nullable. However: if you get a nil value, which you will, for a "null" type value, the goavro parser will simply return nothing. So you will get a measurement that is missing that field. For our uses that's OK, but someone-who-isn't-me might want to take that up with the goavro people too, and see if you can convince them to add an option to return a record saying "this record is empty" rather than literal nil. Again, if you're writing to influxdb this might be problematic. In the current state of the world, it's just a matter of recognizing that you do not want to rename fields that are ["null","something"] because your null records will not get written anyway.

If you know you have only ["null","something"] as your union type, it's the same as any. I kinda suspect that's the most common use of union types, but I don't know that for sure.

@athornton
Copy link
Contributor

Did some work on this today. I didn't like skip--I turned that into flatten which is more descriptive of what happens. It's too bad that the test suite only lets you push a single metric at a time (and I am not feeling ambitious enough to rewrite it), but I have single-metric test cases for nullable and any.

Currently at https://github.com/lsst-sqre/telegraf/tree/features/avro-unions-redux and once I get it built on Linux (there's a bizarre dependency problem on Amazon Linux that I don't have on Mac, but I can't jam a Darwin binary into a Linux container and have it run, obv) so I can test in our containerized environment, I'll PR it.

@BelousovAntonV
Copy link
Contributor

BelousovAntonV commented Sep 12, 2023

Hi @athornton,

Let me describe our use case. We put data to the InfluxDB via Telegraf, but we also put data to historian storage and process it via Spark, also consume data with Camel-K, SparkStreaming, Flink and other tools. We have the only one Schema Registry for all services and using Confluent Wire Protocol for schema ids.

The only issue for us is that when we loading data via Telegraf, in case when we use union in avro schema (f.e. ["null", "float"]) we receive renamed fields in the InfluxDB. I mean that instead of expected field "Value" we get two fields "Value_null"(which is not passed to the InfluxDB in reality) and "Value_float". And we have renamed "Value" field for every datatype. So at the end we have 5 different "Value" fields. To avoid this we had to create schemas for each data type without unions and one more additional schema with "null" datatype, which causes some additional operations in data flow.

@athornton
Copy link
Contributor

athornton commented Sep 12, 2023

OK. I have (it's still being tested) a fix for this, because this is something we want at Rubin too. It's not ready to be PRed yet but if you want to look at my fork/branch, please do, and also please feel free to try docker.io/lsstsqre/telegraf:avrounions .

The fork/branch is at https://github.com/lsst-sqre/telegraf/tree/features/avro-unions-redux

The option you will want to use is avro_union_mode = "nullable"

@athornton
Copy link
Contributor

I have this to a state where it seems to be working pretty well for us, so I've opened the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request Requests for new plugin and for new features to existing plugins
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants