Skip to content

Commit

Permalink
feat: change the public API of schema provider method (#3287)
Browse files Browse the repository at this point in the history
* change public api of method schema provider to use SqlType instead of Schema

* Updated docs and tests

* forgot test jar

* reverted pom changes, applied almog's comments

* applied Andy's comment
  • Loading branch information
vpapavas authored Sep 4, 2019
1 parent 13fde33 commit 1324285
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 25 deletions.
2 changes: 1 addition & 1 deletion docs/developer-guide/udf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ implement a UDF with a non-deterministic return type. A UDF which returns ``BigD
for example, may vary the precision and scale of the output based on the input schema.

To use this functionality, you need to specify a method with signature
``public Schema <your-method-name>(final List<Schema> params)`` and annotate it with ``@SchemaProvider``.
``public SqlType <your-method-name>(final List<SqlType> params)`` and annotate it with ``@SchemaProvider``.
Also, you need to link it to the corresponding UDF by using the ``schemaProvider=<your-method-name>``
parameter of the ``@Udf`` annotation.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.confluent.ksql.function.udf.Kudf;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
Expand Down Expand Up @@ -177,9 +178,11 @@ private void checkMatchingReturnTypes(final Schema s1, final Schema s2) {
if (!SchemaUtil.areCompatible(s1, s2)) {
throw new KsqlException(String.format("Return type %s of UDF %s does not match the declared "
+ "return type %s.",
s1.toString(),
SchemaConverters.connectToSqlConverter().toSqlType(
s1).toString(),
functionName,
s2.toString()));
SchemaConverters.connectToSqlConverter().toSqlType(
s2).toString()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.TypeContextUtil;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.security.ExtensionSecurityManager;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -404,8 +405,12 @@ private Function<List<Schema>,Schema> handleUdfSchemaProviderAnnotation(
final Method m = findSchemaProvider(theClass, schemaProviderName);
final Object instance = instantiateUdfClass(theClass, annotation);

return parameterTypes -> {
return invokeSchemaProviderMethod(instance, m, parameterTypes, annotation);
return parameterSchemas -> {
final List<SqlType> parameterTypes = parameterSchemas.stream()
.map(p -> SchemaConverters.connectToSqlConverter().toSqlType(p))
.collect(Collectors.toList());
return SchemaConverters.sqlToConnectConverter().toConnectSchema(invokeSchemaProviderMethod(
instance, m, parameterTypes, annotation));
};
}

Expand All @@ -421,15 +426,17 @@ private Method findSchemaProvider(final Class<?> theClass,
return m;
} catch (NoSuchMethodException e) {
throw new KsqlException(String.format(
"Cannot find schema provider method with name %s and parameter List<Schema> in class %s.",
schemaProviderName,theClass.getName()),e);
"Cannot find schema provider method with name %s and parameter List<SqlType> in class "
+ "%s.", schemaProviderName,theClass.getName()),e);
}
}

private Schema invokeSchemaProviderMethod(final Object instance, final Method m,
final List<Schema> args, final UdfDescription annotation) {
private SqlType invokeSchemaProviderMethod(final Object instance,
final Method m,
final List<SqlType> args,
final UdfDescription annotation) {
try {
return (Schema) m.invoke(instance, args);
return (SqlType) m.invoke(instance, args);
} catch (IllegalAccessException
| InvocationTargetException e) {
throw new KsqlException(String.format("Cannot invoke the schema provider "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.util.List;
import org.apache.kafka.connect.data.Schema;

@UdfDescription(name = "Abs", description = Abs.DESCRIPTION)
public class Abs {
Expand Down Expand Up @@ -54,12 +54,12 @@ public BigDecimal abs(@UdfParameter final BigDecimal val) {
}

@UdfSchemaProvider
public Schema provideSchema(final List<Schema> params) {
public SqlType provideSchema(final List<SqlType> params) {
if (params.size() != 1) {
throw new KsqlException("Abs udf accepts one parameter");
}
final Schema s = params.get(0);
if (!DecimalUtil.isDecimal(s)) {
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Abs expects a BigDecimal parameter"
+ "type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -246,9 +248,8 @@ public void shouldThrowOnReturnTypeMismatch() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(is("Return type Schema{org.apache.kafka.connect.data."
+ "Decimal:BYTES} of UDF ReturnIncompatible does not "
+ "match the declared return type Schema{STRING}."));
expectedException.expectMessage(is("Return type DECIMAL(2, 1) of UDF ReturnIncompatible does not "
+ "match the declared return type STRING."));

// When:
function.getReturnType(args);
Expand Down Expand Up @@ -301,7 +302,7 @@ public void shouldThrowOnMissingSchemaProvider() throws ClassNotFoundException {
// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(is("Cannot find schema provider method with name provideSchema "
+ "and parameter List<Schema> in class org.damian.ksql.udf."
+ "and parameter List<SqlType> in class org.damian.ksql.udf."
+ "MissingSchemaProviderUdf."));

/// When:
Expand Down Expand Up @@ -671,8 +672,8 @@ public BigDecimal foo(@UdfParameter("justValue") final BigDecimal p) {
}

@UdfSchemaProvider
public Schema provideSchema(List<Schema> params) {
return DecimalUtil.builder(2, 1).build();
public SqlType provideSchema(List<SqlType> params) {
return SqlDecimal.of(2, 1);
}
}

Expand All @@ -689,8 +690,8 @@ public String foo(@UdfParameter("justValue") final BigDecimal p) {
}

@UdfSchemaProvider
public Schema provideSchema(List<Schema> params) {
return DecimalUtil.builder(2, 1).build();
public SqlType provideSchema(List<Schema> params) {
return SqlDecimal.of(2, 1);
}
}
}
Binary file modified ksql-engine/src/test/resources/udf-failing-tests.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.DecimalUtil;
import java.math.BigDecimal;
import java.util.List;
Expand All @@ -41,7 +43,7 @@ public BigDecimal foo(@UdfParameter("justValue") final BigDecimal p) {
}

@UdfSchemaProvider
public Schema provideSchema(List<Schema> params) {
return DecimalUtil.builder(2, 1).build();
public SqlType provideSchema(List<SqlType> params) {
return SqlDecimal.of(2, 1);
}
}
4 changes: 4 additions & 0 deletions ksql-streams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-common</artifactId>
</dependency>

</dependencies>

Expand Down

0 comments on commit 1324285

Please sign in to comment.