Skip to content

Commit

Permalink
refactor: remove CUSTOM from SqlTypeBase (#3293)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Sep 4, 2019
1 parent 12a2e42 commit 6a3a2b9
Show file tree
Hide file tree
Showing 18 changed files with 138 additions and 399 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* The SQL types supported by KSQL.
*/
public enum SqlBaseType {
BOOLEAN, INTEGER, BIGINT, DOUBLE, DECIMAL, STRING, ARRAY, MAP, STRUCT, CUSTOM;
BOOLEAN, INTEGER, BIGINT, DOUBLE, DECIMAL, STRING, ARRAY, MAP, STRUCT;

public boolean isNumber() {
// for now, conversions between DECIMAL and other numeric types is not supported
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ public void shouldHaveConnectTestsForAllSqlTypes() {
final Set<SqlBaseType> tested = SQL_TO_LOGICAL.keySet().stream()
.map(SqlType::baseType)
.collect(Collectors.toSet());
// we cannot resolve unknown types in the converters
tested.add(SqlBaseType.CUSTOM);

final ImmutableSet<SqlBaseType> allTypes = ImmutableSet.copyOf(SqlBaseType.values());

Expand Down Expand Up @@ -155,8 +153,6 @@ public void shouldGetSqlTypeForEveryLogicalType() {
@Test
public void shouldHaveJavaTestsForAllSqlTypes() {
final Set<SqlBaseType> tested = new HashSet<>(SQL_TO_JAVA.keySet());
// we cannot resolve unknown types in the converters
tested.add(SqlBaseType.CUSTOM);

final ImmutableSet<SqlBaseType> allTypes = ImmutableSet.copyOf(SqlBaseType.values());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.function.udaf.TableUdaf;
import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.function.udaf.UdfArgSupplier;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.TypeContextUtil;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -154,13 +155,15 @@ public static UdfInvoker compile(final Method method, final ClassLoader loader)
final Schema argSchema = paramSchema.isEmpty()
? UdfUtil.getSchemaFromType(valueAndAggregateTypes.left)
: SchemaConverters.sqlToConnectConverter()
.toConnectSchema(TypeContextUtil.getType(paramSchema).getSqlType());
.toConnectSchema(
TypeContextUtil.getType(paramSchema, TypeRegistry.EMPTY).getSqlType());
final List<Schema> args = Collections.singletonList(argSchema);

final Schema returnValue = returnSchema.isEmpty()
? SchemaUtil.ensureOptional(UdfUtil.getSchemaFromType(valueAndAggregateTypes.right))
: SchemaConverters.sqlToConnectConverter()
.toConnectSchema(TypeContextUtil.getType(returnSchema).getSqlType());
.toConnectSchema(
TypeContextUtil.getType(returnSchema, TypeRegistry.EMPTY).getSqlType());

return evaluator.apply(args, returnValue, metrics);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.TypeContextUtil;
Expand Down Expand Up @@ -316,7 +317,7 @@ private void addFunction(final Class theClass,
if (annotation.isPresent() && !annotation.get().schema().isEmpty()) {
return SchemaConverters.sqlToConnectConverter()
.toConnectSchema(
TypeContextUtil.getType(annotation.get().schema()).getSqlType(),
TypeContextUtil.getType(annotation.get().schema(), TypeRegistry.EMPTY).getSqlType(),
name,
doc);
}
Expand Down Expand Up @@ -491,7 +492,8 @@ private static Schema getReturnType(final Method method, final Udf udfAnnotation
? UdfUtil.getSchemaFromType(method.getGenericReturnType())
: SchemaConverters
.sqlToConnectConverter()
.toConnectSchema(TypeContextUtil.getType(udfAnnotation.schema()).getSqlType());
.toConnectSchema(
TypeContextUtil.getType(udfAnnotation.schema(), TypeRegistry.EMPTY).getSqlType());

return SchemaUtil.ensureOptional(returnType);
} catch (final KsqlException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.schema.ksql.inference;

import com.google.common.collect.Iterables;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.SchemaParser;
import io.confluent.ksql.parser.SqlFormatter;
Expand Down Expand Up @@ -139,7 +140,8 @@ private static TableElements buildElements(
) {
try {
throwOnInvalidSchema(schema);
return SchemaParser.parse(FORMATTER.format(schema));
// custom types cannot be injected, so we can pass in an EMPTY type registry
return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY);
} catch (final Exception e) {
throw new KsqlStatementException(
"Failed to convert schema to KSQL model: " + e.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.ksql.execution.expression.tree.Type;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
Expand Down Expand Up @@ -116,7 +117,7 @@ private static Class<? extends DataSource> toType(final String type) {

private static Optional<Schema> parseSchema(final String schema) {
return Optional.ofNullable(schema)
.map(TypeContextUtil::getType)
.map(schemaString -> TypeContextUtil.getType(schemaString, TypeRegistry.EMPTY))
.map(Type::getSqlType)
.map(SchemaConverters.sqlToConnectConverter()::toConnectSchema)
.map(SourceNode::makeTopLevelStructNoneOptional);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import io.confluent.ksql.execution.expression.tree.Type;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.schema.ksql.TypeContextUtil;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.test.model.KeyFieldNode;
Expand Down Expand Up @@ -80,7 +81,7 @@ private static Optional<SqlType> buildLegacySchema(

try {
return Optional.ofNullable(valueSchema)
.map(TypeContextUtil::getType)
.map(schema -> TypeContextUtil.getType(schema, TypeRegistry.EMPTY))
.map(Type::getSqlType);
} catch (final Exception e) {
throw new InvalidFieldException("legacySchema", "Failed to parse: " + valueSchema, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.metastore;

import com.google.common.collect.Iterators;
import io.confluent.ksql.schema.ksql.types.SqlType;
import java.util.Iterator;
import java.util.Objects;
Expand Down Expand Up @@ -72,4 +73,27 @@ public SqlType getType() {
}
}

/**
* An empty type registry that does not support registering or deleting types.
*/
TypeRegistry EMPTY = new TypeRegistry() {
@Override
public void registerType(final String name, final SqlType type) { }

@Override
public boolean deleteType(final String name) {
return false;
}

@Override
public Optional<SqlType> resolveType(final String name) {
return Optional.empty();
}

@Override
public Iterator<CustomType> types() {
return Iterators.forArray();
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

class TypeRegistryImpl implements TypeRegistry {
public class TypeRegistryImpl implements TypeRegistry {

private final Map<String, SqlType> typeRegistry = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ public Node visitCast(final SqlBaseParser.CastContext context) {
return new Cast(
getLocation(context),
(Expression) visit(context.expression()),
getType(context.type())
getType(context.type(), metaStore)
);
}

Expand Down Expand Up @@ -1057,7 +1057,7 @@ public Node visitTableElement(final SqlBaseParser.TableElementContext context) {
getLocation(context),
context.KEY() == null ? Namespace.VALUE : Namespace.KEY,
ParserUtil.getIdentifierText(context.identifier()),
getType(context.type())
getType(context.type(), metaStore)
);
}

Expand Down Expand Up @@ -1156,7 +1156,7 @@ public Node visitRegisterType(final RegisterTypeContext context) {
return new RegisterType(
getLocation(context),
ParserUtil.getIdentifierText(context.identifier()),
getType(context.type())
getType(context.type(), metaStore)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.parser.rewrite.CustomTypeRewriter;
import io.confluent.ksql.parser.rewrite.StatementRewriteForStruct;
import io.confluent.ksql.parser.tree.Statement;
import java.util.List;
Expand Down Expand Up @@ -74,11 +73,7 @@ public PreparedStatement<?> prepare(
) {
try {
final AstBuilder astBuilder = new AstBuilder(metaStore);
Statement root = astBuilder.build(stmt.getStatement());

if (CustomTypeRewriter.requiresRewrite(root)) {
root = new CustomTypeRewriter(metaStore, root).rewrite();
}
final Statement root = astBuilder.build(stmt.getStatement());

if (!StatementRewriteForStruct.requiresRewrite(root)) {
return PreparedStatement.of(stmt.getStatementText(), root);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static io.confluent.ksql.schema.ksql.TypeContextUtil.getType;
import static io.confluent.ksql.util.ParserUtil.getLocation;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TableElements;
Expand All @@ -35,7 +37,12 @@ public final class SchemaParser {

private SchemaParser() { }

public static TableElements parse(final String schema) {
@VisibleForTesting
static TableElements parse(final String schema) {
return parse(schema, TypeRegistry.EMPTY);
}

public static TableElements parse(final String schema, final TypeRegistry typeRegistry) {
if (schema.trim().isEmpty()) {
return TableElements.of();
}
Expand Down Expand Up @@ -76,7 +83,7 @@ public void syntaxError(
getLocation(ctx),
ctx.KEY() == null ? Namespace.VALUE : Namespace.KEY,
ParserUtil.getIdentifierText(ctx.identifier()),
getType(ctx.type())
getType(ctx.type(), typeRegistry)
))
.collect(Collectors.toList());

Expand Down
Loading

0 comments on commit 6a3a2b9

Please sign in to comment.