Skip to content

Commit

Permalink
fix: schema converters should handle List and Map impl (#3290)
Browse files Browse the repository at this point in the history
The Java to Sql schema converter should handle converting `HashMap` -> `MAP` and `ArrayList` -> `ARRAY` etc, not just the interface types themselves.
  • Loading branch information
big-andy-coates authored Sep 3, 2019
1 parent 3634895 commit af779dc
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand Down Expand Up @@ -285,12 +286,11 @@ private static class JavaToSqlConverter implements JavaToSqlTypeConverter {

@Override
public SqlBaseType toSqlType(final Class<?> javaType) {
final SqlBaseType sqlType = JAVA_TO_SQL.get(javaType);
if (sqlType == null) {
throw new KsqlException("Unexpected java type: " + javaType);
}

return sqlType;
return JAVA_TO_SQL.entrySet().stream()
.filter(e -> e.getKey().isAssignableFrom(javaType))
.map(Entry::getValue)
.findAny()
.orElseThrow(() -> new KsqlException("Unexpected java type: " + javaType));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.schema.ksql.types.SqlArray;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
Expand All @@ -31,6 +33,8 @@
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -177,6 +181,26 @@ public void shouldGetSqlTypeForAllJavaTypes() {
});
}

@Test
public void shouldGetSqArrayForImplementationsOfJavaList() {
ImmutableList.<Class<?>>of(
ArrayList.class,
ImmutableList.class
).forEach(javaType -> {
assertThat(SchemaConverters.javaToSqlConverter().toSqlType(javaType), is(SqlBaseType.ARRAY));
});
}

@Test
public void shouldGetSqlMapForImplementationsOfJavaMap() {
ImmutableList.<Class<?>>of(
HashMap.class,
ImmutableMap.class
).forEach(javaType -> {
assertThat(SchemaConverters.javaToSqlConverter().toSqlType(javaType), is(SqlBaseType.MAP));
});
}

@Test
public void shouldConvertNestedComplexToSql() {
assertThat(SchemaConverters.connectToSqlConverter().toSqlType(NESTED_LOGICAL_TYPE), is(NESTED_SQL_TYPE));
Expand Down

0 comments on commit af779dc

Please sign in to comment.