From c571508109791d52f72b7e93759e6d1347e95599 Mon Sep 17 00:00:00 2001
From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com>
Date: Wed, 11 Sep 2019 09:09:01 +0100
Subject: [PATCH] feat(static): add custom jackson JSON serde for handling
LogicalSchema (#3322)
* feat(static): add custom jackson JSON serde for handling LogicalSchema
---
.../io/confluent/ksql/json/JsonMapper.java | 1 +
.../json/KsqlTypesSerializationModule.java | 26 +++++++++
.../ksql/json/LogicalSchemaSerializer.java | 44 ++++++++++++++
.../json/LogicalSchemaSerializerTest.java | 58 +++++++++++++++++++
ksql-rest-client/pom.xml | 5 ++
.../ksql/rest/client/KsqlRestClient.java | 5 ++
.../json/KsqlTypesDeserializationModule.java | 26 +++++++++
.../json/LogicalSchemaDeserializer.java | 41 +++++++++++++
.../json/LogicalSchemaDeserializerTest.java | 58 +++++++++++++++++++
9 files changed, 264 insertions(+)
create mode 100644 ksql-common/src/main/java/io/confluent/ksql/json/KsqlTypesSerializationModule.java
create mode 100644 ksql-common/src/main/java/io/confluent/ksql/json/LogicalSchemaSerializer.java
create mode 100644 ksql-common/src/test/java/io/confluent/ksql/json/LogicalSchemaSerializerTest.java
create mode 100644 ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/KsqlTypesDeserializationModule.java
create mode 100644 ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializer.java
create mode 100644 ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializerTest.java
diff --git a/ksql-common/src/main/java/io/confluent/ksql/json/JsonMapper.java b/ksql-common/src/main/java/io/confluent/ksql/json/JsonMapper.java
index 47cf44716eb4..5f121518f237 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/json/JsonMapper.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/json/JsonMapper.java
@@ -29,6 +29,7 @@ public enum JsonMapper {
JsonMapper() {
mapper.registerModule(new Jdk8Module());
mapper.registerModule(new StructSerializationModule());
+ mapper.registerModule(new KsqlTypesSerializationModule());
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
}
}
\ No newline at end of file
diff --git a/ksql-common/src/main/java/io/confluent/ksql/json/KsqlTypesSerializationModule.java b/ksql-common/src/main/java/io/confluent/ksql/json/KsqlTypesSerializationModule.java
new file mode 100644
index 000000000000..375743e99a6c
--- /dev/null
+++ b/ksql-common/src/main/java/io/confluent/ksql/json/KsqlTypesSerializationModule.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.json;
+
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+
+public final class KsqlTypesSerializationModule extends SimpleModule {
+
+ public KsqlTypesSerializationModule() {
+ addSerializer(LogicalSchema.class, new LogicalSchemaSerializer());
+ }
+}
diff --git a/ksql-common/src/main/java/io/confluent/ksql/json/LogicalSchemaSerializer.java b/ksql-common/src/main/java/io/confluent/ksql/json/LogicalSchemaSerializer.java
new file mode 100644
index 000000000000..d8a1751b6630
--- /dev/null
+++ b/ksql-common/src/main/java/io/confluent/ksql/json/LogicalSchemaSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.json;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+import java.io.IOException;
+
+/**
+ * Custom Jackson JSON serializer for {@link LogicalSchema}.
+ *
+ *
The schema is serialized as a simple SQL string
+ */
+final class LogicalSchemaSerializer extends JsonSerializer {
+
+ @Override
+ public void serialize(
+ final LogicalSchema schema,
+ final JsonGenerator gen,
+ final SerializerProvider serializerProvider
+ ) throws IOException {
+ final String text = schema.toString();
+ gen.writeString(trimArrayBrackets(text));
+ }
+
+ private static String trimArrayBrackets(final String text) {
+ return text.substring(1, text.length() - 1);
+ }
+}
diff --git a/ksql-common/src/test/java/io/confluent/ksql/json/LogicalSchemaSerializerTest.java b/ksql-common/src/test/java/io/confluent/ksql/json/LogicalSchemaSerializerTest.java
new file mode 100644
index 000000000000..d2b7b2c7db51
--- /dev/null
+++ b/ksql-common/src/test/java/io/confluent/ksql/json/LogicalSchemaSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.json;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+import io.confluent.ksql.schema.ksql.types.SqlTypes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LogicalSchemaSerializerTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @BeforeClass
+ public static void classSetUp() {
+ MAPPER.registerModule(new TestModule());
+ }
+
+ @Test
+ public void shouldSchemaAsString() throws Exception {
+ // Given:
+ final LogicalSchema schema = LogicalSchema.builder()
+ .keyField("key0", SqlTypes.STRING)
+ .valueField("v0", SqlTypes.INTEGER)
+ .build();
+
+ // When:
+ final String json = MAPPER.writeValueAsString(schema);
+
+ // Then:
+ assertThat(json, is("\"`key0` STRING KEY, `v0` INTEGER\""));
+ }
+
+ private static final class TestModule extends SimpleModule {
+
+ TestModule() {
+ addSerializer(LogicalSchema.class, new LogicalSchemaSerializer());
+ }
+ }
+}
\ No newline at end of file
diff --git a/ksql-rest-client/pom.xml b/ksql-rest-client/pom.xml
index a8a8e45e9ce8..2a4f27562c68 100644
--- a/ksql-rest-client/pom.xml
+++ b/ksql-rest-client/pom.xml
@@ -39,6 +39,11 @@
ksql-rest-model
+
+ io.confluent.ksql
+ ksql-parser
+
+
io.confluent
rest-utils
diff --git a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java
index 254e5cf2f1c9..96b0913f4fe4 100644
--- a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java
+++ b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java
@@ -22,6 +22,7 @@
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.rest.Errors;
+import io.confluent.ksql.rest.client.json.KsqlTypesDeserializationModule;
import io.confluent.ksql.rest.client.ssl.DefaultSslClientConfigurer;
import io.confluent.ksql.rest.client.ssl.SslClientConfigurer;
import io.confluent.ksql.rest.entity.CommandStatus;
@@ -83,6 +84,10 @@ public class KsqlRestClient implements Closeable {
new AuthenticationException("You are forbidden from using this cluster.")
);
+ static {
+ JsonMapper.INSTANCE.mapper.registerModule(new KsqlTypesDeserializationModule());
+ }
+
private final Client client;
private List serverAddresses;
diff --git a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/KsqlTypesDeserializationModule.java b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/KsqlTypesDeserializationModule.java
new file mode 100644
index 000000000000..992010afd770
--- /dev/null
+++ b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/KsqlTypesDeserializationModule.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.rest.client.json;
+
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+
+public class KsqlTypesDeserializationModule extends SimpleModule {
+
+ public KsqlTypesDeserializationModule() {
+ addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer());
+ }
+}
diff --git a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializer.java b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializer.java
new file mode 100644
index 000000000000..f7f01d1e27b8
--- /dev/null
+++ b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializer.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.rest.client.json;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import io.confluent.ksql.metastore.TypeRegistry;
+import io.confluent.ksql.parser.SchemaParser;
+import io.confluent.ksql.parser.tree.TableElements;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+import java.io.IOException;
+
+final class LogicalSchemaDeserializer extends JsonDeserializer {
+
+ @Override
+ public LogicalSchema deserialize(
+ final JsonParser jp,
+ final DeserializationContext ctx
+ ) throws IOException {
+
+ final String text = jp.readValueAs(String.class);
+
+ final TableElements tableElements = SchemaParser.parse(text, TypeRegistry.EMPTY);
+
+ return tableElements.toLogicalSchema();
+ }
+}
diff --git a/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializerTest.java b/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializerTest.java
new file mode 100644
index 000000000000..6512e30e7486
--- /dev/null
+++ b/ksql-rest-client/src/test/java/io/confluent/ksql/rest/client/json/LogicalSchemaDeserializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.rest.client.json;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
+import io.confluent.ksql.schema.ksql.types.SqlTypes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LogicalSchemaDeserializerTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @BeforeClass
+ public static void classSetUp() {
+ MAPPER.registerModule(new TestModule());
+ }
+
+ @Test
+ public void shouldDeserializeSchema() throws Exception {
+ // Given:
+ final String json = "\"`ROWKEY` STRING KEY, `v0` INTEGER\"";
+
+ // When:
+ final LogicalSchema schema = MAPPER.readValue(json, LogicalSchema.class);
+
+ // Then:
+ assertThat(schema, is(LogicalSchema.builder()
+ .keyField("ROWKEY", SqlTypes.STRING)
+ .valueField("v0", SqlTypes.INTEGER)
+ .build()));
+ }
+
+ private static class TestModule extends SimpleModule {
+
+ private TestModule() {
+ addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer());
+ }
+ }
+}