-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enforce that all field schemas must be optional in KSQL #2768
Merged
big-andy-coates
merged 10 commits into
confluentinc:master
from
big-andy-coates:ksql_schema_3
May 7, 2019
Merged
Enforce that all field schemas must be optional in KSQL #2768
big-andy-coates
merged 10 commits into
confluentinc:master
from
big-andy-coates:ksql_schema_3
May 7, 2019
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
agavra
approved these changes
May 2, 2019
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks @big-andy-coates. I have one concern with behavior regarding recursive optionality inline.
JimGalasyn
reviewed
May 2, 2019
JimGalasyn
approved these changes
May 2, 2019
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, with one suggestion.
rodesai
reviewed
May 6, 2019
ksql-common/src/test/java/io/confluent/ksql/function/KsqlFunctionTest.java
Outdated
Show resolved
Hide resolved
ksql-common/src/test/java/io/confluent/ksql/function/KsqlFunctionTest.java
Outdated
Show resolved
Hide resolved
ksql-engine/src/test/java/io/confluent/ksql/function/BaseAggregateFunctionTest.java
Outdated
Show resolved
Hide resolved
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Fixes: #2769
The fields in the row schema in KSQL as supposed to always be optional. (This is work @rodesai did previously). This is important because at the moment if a UDF throws an exception while processing a field KSQL outputs the row with a
null
value in the field. So fields must be nullable.However, while introducing
KsqlSchema
recently I noted there are some points in the code base where this is not being adhered to: the return values of UDF and UDAFs. This PR looks to address this.Obviously, in time we'll likely want to support
NOT NULL
column defs. But that requires some thought! Until that happens, we should be consistent and always have nullable fields.The introduction of our own schema type
KsqlSchema
allows us to add a check for non-optional schemas in one place. Which this PR does.Reviewing notes:
ensureOptional
method inSchemaUtil
and associated tests.KsqlSchema
and associated tests.KsqlFunction
andBaseAggregateFunction
, and associated tests.UdfCompiler
to ensure UDAFs have optional return types, and associated tests.UdfLoader
to ensure UDFs have optional return types, and associated tests. Note: I've refactoredUdfLoaderTest
as well to make it faster. It was taking ~15 seconds, where as it now takes < 1s.5_3_0_pre
expected topology files that have changed to reflect that fields are now deep-optional. See backwards compatibility section below.InsertValuesExecutorTest
as it is no longer possible to get the test pre-conditions i.e. ksql schemas with non-optionals.Backwards compatibility
Obviously, this PR changes the schema KSQL is using to serialize and deserialize data to/from topics, so we need to be careful this is not breaking compatibility.
For the schema-less formats, (JSON, DELIMITED) there is no issue as we're widening the range of permissible values. The serde classes will continue to work as expected.
That leaves AVRO, and luckily AVRO would see this as a forward compatible change, i.e. applications using the new schema can read data written with the older schema. There would be issues if a user was to roll back after writing data, but this is something we've done in the past. Likewise, if there are any downstream clients still using the old schema to read, this will fail. While not ideal, I think this is acceptable and I'll call it out in the release and upgrade notes.
Testing done
Unit and functional
Manual testing:
brought up 5.2 KSQL node and created a persistent query that would have non-optional schema. NB:
collect_list
has a non-optional return value in 5.2.$ bin/ksql-datagen quickstart=clickstream format=avro topic=clickstream maxInterval=100 iterations=100
ksql> CREATE STREAM clickstream with (kafka_topic='clickstream',value_format='AVRO');
ksql> CREATE TABLE TEST AS SELECT userid, collect_list(status) as statuses FROM clickstream GROUP BY userid;
Confirmed I can select from output topic
SELECT * FROM TEST LIMIT 5;
when re-running datagen.Stopped KSQL server and CLI
Started KSQL on this PR's version.
Re-confirmed I can select from output topic
SELECT * FROM TEST LIMIT 5;
when re-running datagen.Confirmed two Avro schema versions in SR for
TEST-value
$ curl -X GET http://localhost:8081/subjects/TEST-value/versions
[1,2]
$ curl -X GET http://localhost:8081/subjects/TEST-value/versions/1
{"subject":"TEST-value","version":1,"id":6,"schema":"{"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"USERID","type":["null","int"],"default":null},{"name":"STATUSES","type":{"type":"array","items":["null","string"]}}]}"}
$ curl -X GET http://localhost:8081/subjects/TEST-value/versions/2
{"subject":"TEST-value","version":2,"id":9,"schema":"{"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"USERID","type":["null","int"],"default":null},{"name":"STATUSES","type":["null",{"type":"array","items":["null","string"]}],"default":null}]}"}
Note: the type of
STATUSES
has changed from an array type to a union of either null or array. Yay!No errors reported & everything worked as expected.
Reviewer checklist