From eed99123a7f673eec9c1f5224c164a5737e6dc41 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Thu, 19 Mar 2020 11:18:01 -0700 Subject: [PATCH] fix: add deserializer for SqlType (#4830) --- .../json/KsqlTypesSerializationModule.java | 2 + .../ksql/json/SqlTypeSchemaSerializer.java | 35 ++++++ .../execution/json/PlanJsonMapperTest.java | 11 +- .../json/KsqlTypesDeserializationModule.java | 6 +- .../ksql/parser/json/SqlTypeDeserializer.java | 39 +++++++ .../parser/json/KsqlTypesSerdeModuleTest.java | 106 ++++++++++++++++++ 6 files changed, 197 insertions(+), 2 deletions(-) create mode 100644 ksqldb-common/src/main/java/io/confluent/ksql/json/SqlTypeSchemaSerializer.java create mode 100644 ksqldb-parser/src/main/java/io/confluent/ksql/parser/json/SqlTypeDeserializer.java create mode 100644 ksqldb-parser/src/test/java/io/confluent/ksql/parser/json/KsqlTypesSerdeModuleTest.java diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/json/KsqlTypesSerializationModule.java b/ksqldb-common/src/main/java/io/confluent/ksql/json/KsqlTypesSerializationModule.java index 375743e99a6c..e200dc770dc2 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/json/KsqlTypesSerializationModule.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/json/KsqlTypesSerializationModule.java @@ -17,10 +17,12 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlType; public final class KsqlTypesSerializationModule extends SimpleModule { public KsqlTypesSerializationModule() { addSerializer(LogicalSchema.class, new LogicalSchemaSerializer()); + addSerializer(SqlType.class, new SqlTypeSchemaSerializer()); } } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/json/SqlTypeSchemaSerializer.java b/ksqldb-common/src/main/java/io/confluent/ksql/json/SqlTypeSchemaSerializer.java new file mode 100644 index 000000000000..aafa0c049e71 --- /dev/null +++ b/ksqldb-common/src/main/java/io/confluent/ksql/json/SqlTypeSchemaSerializer.java @@ -0,0 +1,35 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.json; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import io.confluent.ksql.schema.ksql.FormatOptions; +import io.confluent.ksql.schema.ksql.types.SqlType; +import java.io.IOException; + +public class SqlTypeSchemaSerializer extends JsonSerializer { + + @Override + public void serialize( + final SqlType value, + final JsonGenerator gen, + final SerializerProvider serializers + ) throws IOException { + gen.writeString(value.toString(FormatOptions.none())); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/execution/json/PlanJsonMapperTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/execution/json/PlanJsonMapperTest.java index 741f5735d903..cc8eaf266fd5 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/execution/json/PlanJsonMapperTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/execution/json/PlanJsonMapperTest.java @@ -1,10 +1,12 @@ package io.confluent.ksql.execution.json; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.ksql.parser.json.KsqlTypesDeserializationModule; import org.junit.Test; public class PlanJsonMapperTest { @@ -29,4 +31,11 @@ public void shouldEnableFailOnNullProperties() { public void shouldEnableFailOnInvalidSubtype() { assertThat(MAPPER.isEnabled(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE), is(true)); } + + @Test + public void shouldHaveTypeMapperRegistered() { + assertThat( + MAPPER.getRegisteredModuleIds(), + hasItem(new KsqlTypesDeserializationModule(false).getTypeId())); + } } \ No newline at end of file diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/json/KsqlTypesDeserializationModule.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/json/KsqlTypesDeserializationModule.java index e8c7277743c7..1340d1bdf6a4 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/json/KsqlTypesDeserializationModule.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/json/KsqlTypesDeserializationModule.java @@ -17,10 +17,14 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlType; public class KsqlTypesDeserializationModule extends SimpleModule { - public KsqlTypesDeserializationModule(final boolean withImplicitColumns) { + public KsqlTypesDeserializationModule( + final boolean withImplicitColumns + ) { addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer(withImplicitColumns)); + addDeserializer(SqlType.class, new SqlTypeDeserializer()); } } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/json/SqlTypeDeserializer.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/json/SqlTypeDeserializer.java new file mode 100644 index 000000000000..5609803ceaa2 --- /dev/null +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/json/SqlTypeDeserializer.java @@ -0,0 +1,39 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.json; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import io.confluent.ksql.metastore.TypeRegistry; +import io.confluent.ksql.schema.ksql.SqlTypeParser; +import io.confluent.ksql.schema.ksql.types.SqlType; +import java.io.IOException; + +public class SqlTypeDeserializer extends JsonDeserializer { + + public SqlTypeDeserializer() { + } + + @Override + public SqlType deserialize( + final JsonParser p, + final DeserializationContext ctxt + ) throws IOException { + final String text = p.readValueAs(String.class); + return SqlTypeParser.create(TypeRegistry.EMPTY).parse(text).getSqlType(); + } +} diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/json/KsqlTypesSerdeModuleTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/json/KsqlTypesSerdeModuleTest.java new file mode 100644 index 000000000000..4a3b4e9acf1a --- /dev/null +++ b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/json/KsqlTypesSerdeModuleTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.json; + +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.ksql.json.KsqlTypesSerializationModule; +import io.confluent.ksql.schema.ksql.types.SqlArray; +import io.confluent.ksql.schema.ksql.types.SqlMap; +import io.confluent.ksql.schema.ksql.types.SqlStruct; +import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import org.hamcrest.CoreMatchers; +import org.junit.Test; + +public class KsqlTypesSerdeModuleTest { + + private static final ObjectMapper MAPPER = new ObjectMapper() + .registerModule(new KsqlTypesDeserializationModule(false)) + .registerModule(new KsqlTypesSerializationModule()); + + @Test + public void shouldSerDeSqlPrimitiveTypes() throws JsonProcessingException { + // Given: + final SqlType[] types = new SqlType[]{ + SqlTypes.INTEGER, + SqlTypes.BIGINT, + SqlTypes.DOUBLE, + SqlTypes.STRING + }; + + for (final SqlType type : types) { + // When: + final SqlType out = MAPPER.readValue(MAPPER.writeValueAsString(type), SqlType.class); + + // Then + assertThat(out, is(type)); + } + } + + @Test + public void shouldSerDeSqlArrayTypes() throws JsonProcessingException { + // Given: + final SqlType[] types = new SqlType[]{ + SqlTypes.INTEGER, + SqlTypes.BIGINT, + SqlTypes.DOUBLE, + SqlTypes.STRING + }; + + for (final SqlType type : types) { + // When: + final SqlType out = MAPPER.readValue(MAPPER.writeValueAsString(SqlArray.of(type)), SqlType.class); + + // Then + assertThat(out, is(SqlArray.of(type))); + } + } + + @Test + public void shouldSerDeSqlMapTypes() throws JsonProcessingException { + // Given: + final SqlType[] types = new SqlType[]{ + SqlTypes.INTEGER, + SqlTypes.BIGINT, + SqlTypes.DOUBLE, + SqlTypes.STRING + }; + + for (final SqlType type : types) { + // When: + final SqlType out = MAPPER.readValue(MAPPER.writeValueAsString(SqlMap.of(type)), SqlType.class); + + // Then + assertThat(out, is(SqlMap.of(type))); + } + } + + @Test + public void shouldSerDeStructType() throws JsonProcessingException { + // Given: + SqlStruct struct = SqlStruct.builder().field("foo", SqlArray.of(SqlTypes.STRING)).build(); + + // When: + final SqlType out = MAPPER.readValue(MAPPER.writeValueAsString(struct), SqlType.class); + + // Then: + assertThat(out, is(struct)); + } +} \ No newline at end of file