-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement user defined table functions #3687
Conversation
5b9c45d
to
bad8bb3
Compare
bad8bb3
to
927fa36
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of my comments are nits except for the one about decimals - the code should be failing since explode
doesn't provide any schema return type for the BigDecimal, I'm not sure why it's not.
addFunction(theClass, annotation, method, invoker, path); | ||
functionRegistry.ensureFunctionFactory(factory); | ||
|
||
Arrays.stream(theClass.getMethods()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: I feel like I'd prefer this as a good old fashion for-loop instead of using the streams API, especially since it's side effect driven anyway (i.e. it ends with a forEach
). That way we can ditch all of the Optional stuff that happens here and just add it to the factory directly within the loop...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree. I'm not usually a fan of using streams for the sake of it. This is a copy and paste (old code) but I will change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would rather make the change in a later PR, the reason being that in a later PR I split out the UdfLoader into multiple loader classes, and if I change here I'll just have to reapply the change anyway in the later PR.
|
||
final Schema javaReturnSchema = getReturnType(method, udfAnnotation.schema()); | ||
|
||
return KsqlFunction.create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not your code, but I just wanted to express how insane this method call is 🙄 maybe we can at least move the ksqlConfig factory to a dedicated method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, would be clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will apply this as part of the review of #3689 as it gets factored out anyway (as above)
+ " methods must be static. class={}, method={}, name={}", | ||
method.getDeclaringClass(), | ||
method.getName(), | ||
udafAnnotation.name() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this can move into a .peek()
on the stream to make the filter cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
return Optional.<UdafFactoryInvoker>empty(); | ||
}).filter(Optional::isPresent) | ||
.map(Optional::get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: I mentioned this above, but I feel like using the java8 stream API in this way (turning exceptions into optionals and then filtering them out) adds unnecessary mental load... as with all style comments, feel free to disagree and drop this comment.
Same goes for the UDTF code below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
ksql-engine/src/main/java/io/confluent/ksql/function/udtf/array/Explode.java
Outdated
Show resolved
Hide resolved
ksql-engine/src/test/java/io/confluent/ksql/function/UdtfLoaderTest.java
Show resolved
Hide resolved
ksql-udf/src/main/java/io/confluent/ksql/function/udtf/UdtfDescription.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Description
Implements #3508
This is the 3rd PR in a stack. This is stacked on #3683 so don't review the previous PRs again!
This PR implements user defined table functions. I..e. it allows for the user to provide their own table function implementations in a similar way to how UDFs are implemented using annotations.
Table functions support much of the same functionality as UDFs. They can have multiple parameters which can be of any of the KSQL types. Return values are a List where T is any KSQL type.
The pattern used here is very much the same as that used for UDFs. A lot of files have been touched but that is mainly due to renames.
Schema provider is not supported in this PR and will be the subject of a further PR.
Testing done
Added some tests to test UDTFs with different types of parameters and return types are loaded.
Added some more QTT tests
Reviewer checklist