Skip to content

Commit

Permalink
feat(udfs): generic support for UDFs (#3054)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Jul 24, 2019
1 parent 1ae12a5 commit a381c48
Show file tree
Hide file tree
Showing 30 changed files with 1,774 additions and 230 deletions.
12 changes: 11 additions & 1 deletion docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ Map
.. note:: The ``DELIMITED`` format doesn't support maps.

KSQL supports fields that are maps. A map has a key and value type. All of the keys must be of the
same type, and all of the values must be also be of the same type. Currently only ``STRING`` keys are supported. The value type can be any valid KSQL type.
same type, and all of the values must be also be of the same type. Currently only ``STRING`` keys
are supported. The value type can be any valid KSQL type.

You can define maps within a ``CREATE TABLE`` or ``CREATE STREAM`` statement by using the syntax
``MAP<KeyType, ValueType>``. For example, ``MAP<STRING, INT>`` defines a map with string keys and
Expand Down Expand Up @@ -1528,6 +1529,12 @@ Scalar functions
| ARRAYCONTAINS | ``ARRAYCONTAINS('[1, 2, 3]', 3)`` | Given JSON or AVRO array checks if a search |
| | | value contains in it |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| AS_ARRAY | ``AS_ARRAY(col1, col2)``` | Construct an array from a variable number of |
| | | inputs. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| AS_MAP | ``AS_MAP(keys, vals)``` | Construct a map from a list of keys and a list of |
| | | values. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| CEIL | ``CEIL(col1)`` | The ceiling of a value. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| CONCAT | ``CONCAT(col1, '_hello')`` | Concatenate two strings. |
Expand Down Expand Up @@ -1642,6 +1649,9 @@ Scalar functions
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| SQRT | ``SQRT(col1)`` | The square root of a value. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| SLICE | ``SLICE(col1, from, to)`` | Slices a list based on the supplied indices. The |
| | | indices start at 1 and include both endpoints. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| SPLIT | ``SPLIT(col1, delimiter)`` | Splits a string into an array of substrings based |
| | | on a delimiter. If the delimiter is not found, |
| | | then the original string is returned as the only |
Expand Down
10 changes: 10 additions & 0 deletions docs/developer-guide/udf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ The KSQL server will check the value being passed to each parameter and report a
log for any null values being passed to a primitive type. The associated column in the output row
will be ``null``.

Generics in UDFS
~~~~~~~~~~~~~~~~

A UDF declaration can utilize generics if they match the following conditions:

- Any generic in the return value of a method must appear in at least one of the method parameters
- The generic must not adhere to any interface. For example, ``<T extends Number>`` is not valid).
- The generic does not support type coercion or inheritance. For example, ``add(T a, T b)`` will
accept ``BIGINT, BIGINT`` but not ``INT, BIGINT``.

.. _example-udf-class:

Example UDF class
Expand Down
266 changes: 266 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/function/GenericsUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlPreconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

public final class GenericsUtil {

private static final String PREFIX = "<";
private static final String SUFFIX = ">";
private static final Pattern GENERIC_PATTERN = Pattern.compile("<(?<name>.*)>");

private GenericsUtil() { }

/**
* @param typeName the generic type name (e.g. {@code T})
* @return a {@link SchemaBuilder} for a generic type with that name
*/
public static SchemaBuilder generic(final String typeName) {
return SchemaBuilder.bytes().optional().name(PREFIX + typeName + SUFFIX);
}

/**
* @param typeName the generic type name (e.g. {@code T})
* @return a {@link SchemaBuilder} for a generic array with generic element type
*/
public static SchemaBuilder array(final String typeName) {
return SchemaBuilder.array(generic(typeName).build()).optional();
}

/**
* @param keyType the type for the map key
* @param valueTypeName the generic type name (e.g. {@code T}) for the map value
* @return a {@link SchemaBuilder} for a map with {@code keyType} keys and generic value type
*/
public static SchemaBuilder map(final Schema keyType, final String valueTypeName) {
return SchemaBuilder.map(keyType, generic(valueTypeName).build()).optional();
}

/**
* @param type the type
* @return whether or not {@code type} is a generic type
* @apiNote container generics (e.g. {@code ARRAY<T>}) will return {@code false},
* use {@link #constituentGenerics(Schema)}
*/
public static boolean isGeneric(final Schema type) {
return type.name() != null
&& type.name().startsWith(PREFIX)
&& type.name().endsWith(SUFFIX);
}

/**
* @param type the type
* @return all generics contained within the type, for example: {@code Map<K, V>} would return
* a set containing {@code {<K>, <V>}}
*/
public static Set<Schema> constituentGenerics(final Schema type) {
switch (type.type()) {
case ARRAY:
return constituentGenerics(type.valueSchema());
case MAP:
return Sets.union(
constituentGenerics(type.keySchema()),
constituentGenerics(type.valueSchema()));
case STRUCT:
return type.fields().stream()
.map(Field::schema)
.map(GenericsUtil::constituentGenerics)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
case BYTES:
if (isGeneric(type)) {
return ImmutableSet.of(type);
}
return ImmutableSet.of();
default:
return ImmutableSet.of();
}
}

/**
* @param type the schema
* @return whether or not there are any generics contained in {@code type}
*/
public static boolean hasGenerics(final Schema type) {
return !constituentGenerics(type).isEmpty();
}

/**
* Replaces all generics in a schema with concrete schemas defined in {@code resolved}
*
* @param schema the schema which may contain generics
* @param resolved the mapping from generics to resolved types
* @return a schema with the same structure as {@code schema} but with no generics
*
* @throws KsqlException if there is a generic in {@code schema} that is not present
* in {@code mapping}
*/
public static Schema applyResolved(final Schema schema, final Map<Schema, Schema> resolved) {
switch (schema.type()) {
case ARRAY:
return SchemaBuilder
.array(applyResolved(schema.valueSchema(), resolved))
.optional()
.build();
case MAP:
return SchemaBuilder
.map(
applyResolved(schema.keySchema(), resolved),
applyResolved(schema.valueSchema(), resolved))
.optional()
.build();
case BYTES:
if (!isGeneric(schema)) {
return schema;
}

final Schema instance = resolved.get(schema);
if (instance == null) {
throw new KsqlException("Could not find mapping for generic type: " + schema);
}
return instance;
default:
return schema;
}
}

/**
* Identifies a mapping from generic type to concrete type based on a {@code schema} and
* an {@code instance}, where the {@code instance} schema is expected to have no generic
* types and have the same nested structure as {@code schema}.
*
* @param schema the schema that may contain generics
* @param instance a schema with the same structure as {@code schema} but with no generics
*
* @return a mapping from generic type to resolved type
*/
public static Map<Schema, Schema> resolveGenerics(
final Schema schema,
final Schema instance
) {
final List<Entry<Schema, Schema>> genericMapping = new ArrayList<>();
final boolean success = resolveGenerics(genericMapping, schema, instance);
if (!success) {
throw new KsqlException(
String.format("Cannot infer generics for %s from %s because "
+ "they do not have the same schema structure.",
schema,
instance));
}

final Map<Schema, Schema> mapping = new HashMap<>();
for (final Entry<Schema, Schema> entry : genericMapping) {
final Schema old = mapping.putIfAbsent(entry.getKey(), entry.getValue());
if (old != null && !old.equals(entry.getValue())) {
throw new KsqlException(String.format(
"Found invalid instance of generic schema. Cannot map %s to both %s and %s",
schema.name(),
old,
instance));
}
}

return ImmutableMap.copyOf(mapping);
}

private static boolean resolveGenerics(
final List<Entry<Schema, Schema>> mapping,
final Schema schema,
final Schema instance
) {
if (!isGeneric(schema) && instance.type() != schema.type()) {
// cannot identify from type mismatch
return false;
} else if (!hasGenerics(schema)) {
// nothing left to identify
return true;
}

KsqlPreconditions.checkArgument(
isGeneric(schema) || (instance.type() == schema.type()),
"Cannot resolve generics if the schema and instance have differing types: "
+ schema + " vs. " + instance);
switch (schema.type()) {
case BYTES:
mapping.add(new HashMap.SimpleEntry<>(schema, instance));
return true;
case ARRAY:
return resolveGenerics(mapping, schema.valueSchema(), instance.valueSchema());
case MAP:
return resolveGenerics(mapping, schema.keySchema(), instance.keySchema())
&& resolveGenerics(mapping, schema.valueSchema(), instance.valueSchema());
case STRUCT:
throw new KsqlException("Generic STRUCT is not yet supported");
default:
return true;
}
}

/**
* @param schema the schema with generics
* @param instance a schema without generics
* @return whether {@code instance} conforms to the structure of {@code schema}
*/
public static boolean instanceOf(final Schema schema, final Schema instance) {
final List<Entry<Schema, Schema>> mappings = new ArrayList<>();

if (!resolveGenerics(mappings, schema, instance)) {
return false;
}

final Map<Schema, Schema> asMap = new HashMap<>();
for (final Entry<Schema, Schema> entry : mappings) {
final Schema old = asMap.putIfAbsent(entry.getKey(), entry.getValue());
if (old != null && !old.equals(entry.getValue())) {
return false;
}
}

return true;
}

/**
* @param schema the schema to extract the name for
* @return the name of {@code schema}
* @throws KsqlException if {@code schema} is not a generic schema
*/
public static String name(final Schema schema) {
final Matcher matcher = GENERIC_PATTERN.matcher(schema.name());
if (matcher.matches()) {
return matcher.group("name");
}

throw new KsqlException("Cannot extract name from non-generic schema: " + schema);
}
}
Loading

0 comments on commit a381c48

Please sign in to comment.