Skip to content

Commit

Permalink
feat: custom comparators for array, map and struct (#3385)
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas authored Sep 27, 2019
1 parent e13cb46 commit fe63d21
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 17 deletions.
58 changes: 46 additions & 12 deletions ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Ordering;
import io.confluent.ksql.function.GenericsUtil;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.Operator;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
Expand Down Expand Up @@ -91,9 +92,9 @@ public final class SchemaUtil {

private static final Map<Type, BiPredicate<Schema, Schema>> CUSTOM_SCHEMA_EQ =
ImmutableMap.<Type, BiPredicate<Schema, Schema>>builder()
.put(Type.MAP, SchemaUtil::mapEquals)
.put(Type.ARRAY, SchemaUtil::arrayEquals)
.put(Type.STRUCT, SchemaUtil::structEquals)
.put(Type.MAP, SchemaUtil::mapCompatible)
.put(Type.ARRAY, SchemaUtil::arrayCompatible)
.put(Type.STRUCT, SchemaUtil::structCompatible)
.put(Type.BYTES, SchemaUtil::bytesEquals)
.build();

Expand Down Expand Up @@ -349,25 +350,58 @@ public static boolean areCompatible(final Schema arg1, final Schema arg2) {
&& Objects.deepEquals(arg1.defaultValue(), arg2.defaultValue());
}

private static boolean mapEquals(final Schema mapA, final Schema mapB) {
return Objects.equals(mapA.keySchema(), mapB.keySchema())
&& Objects.equals(mapA.valueSchema(), mapB.valueSchema());
private static boolean mapCompatible(final Schema mapA, final Schema mapB) {
return areCompatible(mapA.keySchema(), mapB.keySchema())
&& areCompatible(mapA.valueSchema(), mapB.valueSchema());
}

private static boolean arrayEquals(final Schema arrayA, final Schema arrayB) {
return Objects.equals(arrayA.valueSchema(), arrayB.valueSchema());
private static boolean arrayCompatible(final Schema arrayA, final Schema arrayB) {
return areCompatible(arrayA.valueSchema(), arrayB.valueSchema());
}

private static boolean structEquals(final Schema structA, final Schema structB) {
private static boolean structCompatible(final Schema structA, final Schema structB) {
return structA.fields().isEmpty()
|| structB.fields().isEmpty()
|| Objects.equals(structA.fields(), structB.fields());
|| compareFieldsOfStructs(structA, structB);
}


private static boolean compareFieldsOfStructs(final Schema structA, final Schema structB) {

final List<Field> fieldsA = structA.fields();
final List<Field> fieldsB = structB.fields();
final int sizeA = fieldsA.size();
final int sizeB = fieldsB.size();

if (sizeA != sizeB) {
return false;
}

// Custom field comparison to support comparison of structs with decimals and generics
for (int i = 0; i < sizeA; i++) {
final Field fieldA = fieldsA.get(i);
final Field fieldB = fieldsB.get(i);
if (!fieldA.name().equals(fieldB.name())
|| fieldA.index() != fieldB.index()
|| ! areCompatible(fieldsA.get(i).schema(), fieldsB.get(i).schema())) {
return false;
}
}
return true;
}

private static boolean bytesEquals(final Schema bytesA, final Schema bytesB) {
// two datatypes are currently represented as bytes: generics and decimals

if (GenericsUtil.isGeneric(bytesA)) {
if (GenericsUtil.isGeneric(bytesB)) {
return bytesA.name().equals(bytesB.name());
}
return false;
}

// from a Java schema perspective, all decimals are the same
// since they can all be cast to BigDecimal - other bytes types
// are not supported in UDFs
// since they can all be cast to BigDecimal
return DecimalUtil.isDecimal(bytesA) && DecimalUtil.isDecimal(bytesB);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,26 +764,81 @@ public void shouldFailIsNumberForString() {
public void shouldFailINonCompatibleSchemas() {
assertThat(SchemaUtil.areCompatible(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA), is(false));

assertThat(SchemaUtil.areCompatible(DecimalUtil.builder(1,1).build(),
assertThat(SchemaUtil.areCompatible(DecimalUtil.builder(1, 1).build(),
Schema.BYTES_SCHEMA), is(false));

assertThat(SchemaUtil.areCompatible(GenericsUtil.generic("a").build(),
GenericsUtil.generic("b").build()), is(false));

assertThat(SchemaUtil.areCompatible(GenericsUtil.array("a").build(),
GenericsUtil.array("b").build()), is(false));

assertThat(SchemaUtil.areCompatible(SchemaBuilder.array(Schema.INT32_SCHEMA).build(),
SchemaBuilder.array(Schema.STRING_SCHEMA).build()),
is(false));

assertThat(SchemaUtil.areCompatible(
SchemaBuilder.struct().field("a", DecimalUtil.builder(1,1)).build(),
SchemaBuilder.struct().field("a", Schema.FLOAT64_SCHEMA).build()),
is(false));

assertThat(SchemaUtil.areCompatible(
SchemaBuilder.struct().field("a", GenericsUtil.generic("a").build()),
SchemaBuilder.struct().field("a", GenericsUtil.generic("b").build())),
is(false));

assertThat(SchemaUtil.areCompatible(
SchemaBuilder.map(DecimalUtil.builder(1, 1).build(),
SchemaBuilder.array(DecimalUtil.builder(2, 2).build())),
SchemaBuilder.map(Schema.FLOAT64_SCHEMA,
SchemaBuilder.array(DecimalUtil.builder(2, 2).build()))),
is(false));

assertThat(SchemaUtil.areCompatible(
SchemaBuilder.map(DecimalUtil.builder(1, 1).build(),
SchemaBuilder.array(Schema.FLOAT64_SCHEMA)),
SchemaBuilder.map(DecimalUtil.builder(1, 1).build(),
SchemaBuilder.array(DecimalUtil.builder(2, 2).build()))),
is(false));

}

@Test
public void shouldPassCompatibleSchemas() {
assertThat(SchemaUtil.areCompatible(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA),
is(true));

assertThat(SchemaUtil.areCompatible(DecimalUtil.builder(2,2),
DecimalUtil.builder(1,1)), is(true));
assertThat(SchemaUtil.areCompatible(DecimalUtil.builder(2, 2),
DecimalUtil.builder(1, 1)), is(true));

assertThat(SchemaUtil.areCompatible(GenericsUtil.array("a").build(),
GenericsUtil.array("a").build()), is(true));

assertThat(SchemaUtil.areCompatible(SchemaBuilder.array(DecimalUtil.builder(2, 2)).build(),
SchemaBuilder.array(DecimalUtil.builder(2, 2)).build()),
is(true));

assertThat(SchemaUtil.areCompatible(SchemaBuilder.array(Schema.INT32_SCHEMA).build(),
SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).build()),
is(true));

assertThat(SchemaUtil.areCompatible(
SchemaBuilder.struct().field("a", DecimalUtil.builder(2, 2)).build(),
SchemaBuilder.struct().field("a", DecimalUtil.builder(2, 2)).build()),
is(true));

assertThat(SchemaUtil.areCompatible(
SchemaBuilder.struct().field("a", GenericsUtil.generic("a").build()),
SchemaBuilder.struct().field("a", GenericsUtil.generic("a").build())),
is(true));

assertThat(SchemaUtil.areCompatible(
SchemaBuilder.map(DecimalUtil.builder(2, 2).build(),
SchemaBuilder.array(DecimalUtil.builder(2, 2).build())),
SchemaBuilder.map(DecimalUtil.builder(2, 2).build(),
SchemaBuilder.array(DecimalUtil.builder(2, 2).build()))),
is(true));

assertThat(SchemaUtil.areCompatible(GenericsUtil.generic("a").build(),
GenericsUtil.generic("a").build()), is(false));
}

@Test
Expand Down

0 comments on commit fe63d21

Please sign in to comment.