Skip to content

Commit

Permalink
feat(static): add custom jackson JSON serde for handling LogicalSchema (
Browse files Browse the repository at this point in the history
#3322)

* feat(static): add custom jackson JSON serde for handling LogicalSchema
  • Loading branch information
big-andy-coates authored Sep 11, 2019
1 parent 0ff4a51 commit c571508
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public enum JsonMapper {
JsonMapper() {
mapper.registerModule(new Jdk8Module());
mapper.registerModule(new StructSerializationModule());
mapper.registerModule(new KsqlTypesSerializationModule());
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.json;

import com.fasterxml.jackson.databind.module.SimpleModule;
import io.confluent.ksql.schema.ksql.LogicalSchema;

public final class KsqlTypesSerializationModule extends SimpleModule {

public KsqlTypesSerializationModule() {
addSerializer(LogicalSchema.class, new LogicalSchemaSerializer());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.json;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.io.IOException;

/**
* Custom Jackson JSON serializer for {@link LogicalSchema}.
*
* <p>The schema is serialized as a simple SQL string
*/
final class LogicalSchemaSerializer extends JsonSerializer<LogicalSchema> {

@Override
public void serialize(
final LogicalSchema schema,
final JsonGenerator gen,
final SerializerProvider serializerProvider
) throws IOException {
final String text = schema.toString();
gen.writeString(trimArrayBrackets(text));
}

private static String trimArrayBrackets(final String text) {
return text.substring(1, text.length() - 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.json;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import org.junit.BeforeClass;
import org.junit.Test;

public class LogicalSchemaSerializerTest {

private static final ObjectMapper MAPPER = new ObjectMapper();

@BeforeClass
public static void classSetUp() {
MAPPER.registerModule(new TestModule());
}

@Test
public void shouldSchemaAsString() throws Exception {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyField("key0", SqlTypes.STRING)
.valueField("v0", SqlTypes.INTEGER)
.build();

// When:
final String json = MAPPER.writeValueAsString(schema);

// Then:
assertThat(json, is("\"`key0` STRING KEY, `v0` INTEGER\""));
}

private static final class TestModule extends SimpleModule {

TestModule() {
addSerializer(LogicalSchema.class, new LogicalSchemaSerializer());
}
}
}
5 changes: 5 additions & 0 deletions ksql-rest-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>ksql-rest-model</artifactId>
</dependency>

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-parser</artifactId>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>rest-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.json.KsqlTypesDeserializationModule;
import io.confluent.ksql.rest.client.ssl.DefaultSslClientConfigurer;
import io.confluent.ksql.rest.client.ssl.SslClientConfigurer;
import io.confluent.ksql.rest.entity.CommandStatus;
Expand Down Expand Up @@ -83,6 +84,10 @@ public class KsqlRestClient implements Closeable {
new AuthenticationException("You are forbidden from using this cluster.")
);

static {
JsonMapper.INSTANCE.mapper.registerModule(new KsqlTypesDeserializationModule());
}

private final Client client;

private List<URI> serverAddresses;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client.json;

import com.fasterxml.jackson.databind.module.SimpleModule;
import io.confluent.ksql.schema.ksql.LogicalSchema;

public class KsqlTypesDeserializationModule extends SimpleModule {

public KsqlTypesDeserializationModule() {
addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client.json;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.parser.SchemaParser;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.io.IOException;

final class LogicalSchemaDeserializer extends JsonDeserializer<LogicalSchema> {

@Override
public LogicalSchema deserialize(
final JsonParser jp,
final DeserializationContext ctx
) throws IOException {

final String text = jp.readValueAs(String.class);

final TableElements tableElements = SchemaParser.parse(text, TypeRegistry.EMPTY);

return tableElements.toLogicalSchema();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client.json;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import org.junit.BeforeClass;
import org.junit.Test;

public class LogicalSchemaDeserializerTest {

private static final ObjectMapper MAPPER = new ObjectMapper();

@BeforeClass
public static void classSetUp() {
MAPPER.registerModule(new TestModule());
}

@Test
public void shouldDeserializeSchema() throws Exception {
// Given:
final String json = "\"`ROWKEY` STRING KEY, `v0` INTEGER\"";

// When:
final LogicalSchema schema = MAPPER.readValue(json, LogicalSchema.class);

// Then:
assertThat(schema, is(LogicalSchema.builder()
.keyField("ROWKEY", SqlTypes.STRING)
.valueField("v0", SqlTypes.INTEGER)
.build()));
}

private static class TestModule extends SimpleModule {

private TestModule() {
addDeserializer(LogicalSchema.class, new LogicalSchemaDeserializer());
}
}
}

0 comments on commit c571508

Please sign in to comment.