Skip to content

Commit

Permalink
fix: support partial schemas
Browse files Browse the repository at this point in the history
fixes: confluentinc#4566

With this change users can now supply just the key schema and use schema inference to get the value columns. For example, if the key is an `INT` serialized using Kafka's `IntegerSerializer` and the value is an Avro record with the schema stored in the Scheme Registry, then such a stream can be registered in ksqlDB with a statement such as:

```sql
-- note: only the key columns are provided between the first set of brackets
-- the value columns will be inferred from the Avro schema in the Schema Registry
CREATE STREAM users (ROWKEY INT KET) WITH (kafka_topic='users', value_format='avro');
```
  • Loading branch information
big-andy-coates committed Feb 25, 2020
1 parent eb07370 commit 87dbc04
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

package io.confluent.ksql.schema.ksql.inference;

import com.google.common.collect.Iterables;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.SchemaParser;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter.Option;
Expand All @@ -35,20 +36,25 @@
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Schema;

/**
* An injector which injects the schema into the supplied {@code statement}.
* An injector which injects the value columns into the supplied {@code statement}.
*
* <p>The schema is only injected if:
* <p>The value columns are only injected if:
* <ul>
* <li>The statement is a CT/CS.</li>
* <li>The statement does not defined a schema.</li>
* <li>The statement does not defined any value columns.</li>
* <li>The format of the statement supports schema inference.</li>
* </ul>
*
* <p>Any key columns present are passed through unchanged.
*
* <p>If any of the above are not true then the {@code statement} is returned unchanged.
*/
public class DefaultSchemaInjector implements Injector {
Expand All @@ -62,7 +68,6 @@ public DefaultSchemaInjector(final TopicSchemaSupplier schemaSupplier) {
this.schemaSupplier = Objects.requireNonNull(schemaSupplier, "schemaSupplier");
}


@SuppressWarnings("unchecked")
@Override
public <T extends Statement> ConfiguredStatement<T> inject(
Expand Down Expand Up @@ -90,17 +95,18 @@ public <T extends Statement> ConfiguredStatement<T> inject(
private Optional<ConfiguredStatement<CreateSource>> forCreateStatement(
final ConfiguredStatement<CreateSource> statement
) {
if (hasElements(statement)
if (hasValueElements(statement)
|| !statement.getStatement().getProperties().getValueFormat().supportsSchemaInference()) {
return Optional.empty();
}

final SchemaAndId valueSchema = getValueSchema(statement);
final CreateSource withSchema = addSchemaFields(statement, valueSchema);
final PreparedStatement<CreateSource> prepared =
buildPreparedStatement(withSchema);
return Optional.of(ConfiguredStatement.of(
prepared, statement.getOverrides(), statement.getConfig()));
final PreparedStatement<CreateSource> prepared = buildPreparedStatement(withSchema);
final ConfiguredStatement<CreateSource> configured = ConfiguredStatement
.of(prepared, statement.getOverrides(), statement.getConfig());

return Optional.of(configured);
}

private SchemaAndId getValueSchema(
Expand All @@ -123,10 +129,11 @@ private SchemaAndId getValueSchema(
return result.schemaAndId.get();
}

private static boolean hasElements(
private static boolean hasValueElements(
final ConfiguredStatement<CreateSource> statement
) {
return !Iterables.isEmpty(statement.getStatement().getElements());
return statement.getStatement().getElements().stream()
.anyMatch(e -> e.getNamespace().equals(Namespace.VALUE));
}

private static CreateSource addSchemaFields(
Expand All @@ -145,23 +152,44 @@ private static CreateSource addSchemaFields(
}

private static TableElements buildElements(
final Schema schema,
final Schema valueSchema,
final ConfiguredStatement<CreateSource> preparedStatement
) {
try {
throwOnInvalidSchema(schema);
// custom types cannot be injected, so we can pass in an EMPTY type registry
return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY);
} catch (final Exception e) {
throw new KsqlStatementException(
"Failed to convert schema to KSQL model: " + e.getMessage(),
preparedStatement.getStatementText(),
e);
}
throwOnInvalidSchema(valueSchema);

final List<TableElement> elements = new ArrayList<>();

getKeyColumns(preparedStatement)
.forEach(elements::add);

getColumnsFromSchema(valueSchema)
.forEach(elements::add);

return TableElements.of(elements);
}

private static Stream<TableElement> getKeyColumns(
final ConfiguredStatement<CreateSource> preparedStatement
) {
return preparedStatement.getStatement().getElements().stream()
.filter(e -> e.getNamespace() == Namespace.KEY);
}

private static Stream<TableElement> getColumnsFromSchema(final Schema schema) {
return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY).stream();
}

private static void throwOnInvalidSchema(final Schema schema) {
SchemaConverters.connectToSqlConverter().toSqlType(schema);
try {
SchemaConverters.connectToSqlConverter().toSqlType(schema);
} catch (final Exception e) {
throw new KsqlException(
"Schema contains types not supported by KSQL: " + e.getMessage()
+ System.lineSeparator()
+ "Schema: " + schema,
e
);
}
}

private static PreparedStatement<CreateSource> buildPreparedStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import static io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaAndId.schemaAndId;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -381,7 +379,7 @@ public void shouldAddSchemaIdIfNotPresentAlready() {
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId().get(), is(SCHEMA_ID));
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(SCHEMA_ID)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID=5"));
}
Expand All @@ -395,8 +393,7 @@ public void shouldNotOverwriteExistingSchemaId() {
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId().get(),
is(42));
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(42)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID='42'"));
}
Expand All @@ -416,7 +413,7 @@ public void shouldThrowOnUnsupportedType() {
fail("Expected KsqlStatementException. schema: " + unsupportedSchema);
} catch (final KsqlStatementException e) {
assertThat(e.getRawMessage(),
containsString("Failed to convert schema to KSQL model:"));
containsString("Schema contains types not supported by KSQL:"));

assertThat(e.getSqlStatement(), is(csStatement.getStatementText()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,84 @@
"inputs": [{"topic": "input", "value": {"c1": 4}}],
"outputs": [{"topic": "OUTPUT", "value": {"C1": 4}}]
},
{
"name": "with invalid or reserved words in schema - JSON",
"statements": [
"CREATE STREAM INPUT WITH (kafka_topic='input', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT * FROM input;"
],
"topics": [
{
"name": "input",
"schema": {"type": "object","properties": {
"@timestamp": {"type": "integer"},
"from": {"type": "integer"}
}},
"format": "JSON"
},
{
"name": "OUTPUT",
"format": "JSON"
}
],
"inputs": [{"topic": "input", "value": {"@timestamp": 4, "from": 5}}],
"outputs": [{"topic": "OUTPUT", "value": {"@TIMESTAMP": 4, "FROM": 5}}],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "ROWKEY STRING KEY, `@TIMESTAMP` BIGINT, `FROM` BIGINT"}
]
}
},
{
"name": "validate without value elements OK - AVRO",
"statements": [
"CREATE STREAM INPUT (rowkey int key) WITH (kafka_topic='input', value_format='AvRo');",
"CREATE STREAM OUTPUT AS SELECT * FROM input;"
],
"topics": [
{
"name": "input",
"schema": {"name": "blah", "type": "record", "fields": [{"name": "c1", "type": "int"}]},
"format": "AVRO"
},
{
"name": "OUTPUT",
"format": "AVRO"
}
],
"inputs": [{"topic": "input", "key": 42, "value": {"c1": 4}}],
"outputs": [{"topic": "OUTPUT", "key": 42, "value": {"C1": 4}}],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "ROWKEY INT KEY, `C1` INT"}
]
}
},
{
"name": "validate without value elements OK - JSON SCHEMA",
"statements": [
"CREATE STREAM INPUT (rowkey int key) WITH (kafka_topic='input', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT * FROM input;"
],
"topics": [
{
"name": "input",
"schema": {"type": "object","properties": {"c1": {"type": "integer"}}},
"format": "JSON"
},
{
"name": "OUTPUT",
"format": "JSON"
}
],
"inputs": [{"topic": "input", "key": 42, "value": {"c1": 4}}],
"outputs": [{"topic": "OUTPUT", "key": 42, "value": {"C1": 4}}],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "ROWKEY INT KEY, `C1` BIGINT"}
]
}
},
{
"name": "validate with elements OK",
"format": ["JSON", "PROTOBUF"],
Expand Down

0 comments on commit 87dbc04

Please sign in to comment.