Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support partial schemas #4625

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

package io.confluent.ksql.schema.ksql.inference;

import com.google.common.collect.Iterables;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.SchemaParser;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter.Option;
Expand All @@ -35,20 +36,25 @@
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Schema;

/**
* An injector which injects the schema into the supplied {@code statement}.
* An injector which injects the value columns into the supplied {@code statement}.
*
* <p>The schema is only injected if:
* <p>The value columns are only injected if:
* <ul>
* <li>The statement is a CT/CS.</li>
* <li>The statement does not defined a schema.</li>
* <li>The statement does not defined any value columns.</li>
* <li>The format of the statement supports schema inference.</li>
* </ul>
*
* <p>Any key columns present are passed through unchanged.
*
* <p>If any of the above are not true then the {@code statement} is returned unchanged.
*/
public class DefaultSchemaInjector implements Injector {
Expand All @@ -62,7 +68,6 @@ public DefaultSchemaInjector(final TopicSchemaSupplier schemaSupplier) {
this.schemaSupplier = Objects.requireNonNull(schemaSupplier, "schemaSupplier");
}


@SuppressWarnings("unchecked")
@Override
public <T extends Statement> ConfiguredStatement<T> inject(
Expand Down Expand Up @@ -90,17 +95,18 @@ public <T extends Statement> ConfiguredStatement<T> inject(
private Optional<ConfiguredStatement<CreateSource>> forCreateStatement(
final ConfiguredStatement<CreateSource> statement
) {
if (hasElements(statement)
if (hasValueElements(statement)
|| !statement.getStatement().getProperties().getValueFormat().supportsSchemaInference()) {
return Optional.empty();
}

final SchemaAndId valueSchema = getValueSchema(statement);
final CreateSource withSchema = addSchemaFields(statement, valueSchema);
final PreparedStatement<CreateSource> prepared =
buildPreparedStatement(withSchema);
return Optional.of(ConfiguredStatement.of(
prepared, statement.getOverrides(), statement.getConfig()));
final PreparedStatement<CreateSource> prepared = buildPreparedStatement(withSchema);
final ConfiguredStatement<CreateSource> configured = ConfiguredStatement
.of(prepared, statement.getOverrides(), statement.getConfig());

return Optional.of(configured);
}

private SchemaAndId getValueSchema(
Expand All @@ -123,10 +129,11 @@ private SchemaAndId getValueSchema(
return result.schemaAndId.get();
}

private static boolean hasElements(
private static boolean hasValueElements(
final ConfiguredStatement<CreateSource> statement
) {
return !Iterables.isEmpty(statement.getStatement().getElements());
return statement.getStatement().getElements().stream()
.anyMatch(e -> e.getNamespace().equals(Namespace.VALUE));
}

private static CreateSource addSchemaFields(
Expand All @@ -145,23 +152,44 @@ private static CreateSource addSchemaFields(
}

private static TableElements buildElements(
final Schema schema,
final Schema valueSchema,
final ConfiguredStatement<CreateSource> preparedStatement
) {
try {
throwOnInvalidSchema(schema);
// custom types cannot be injected, so we can pass in an EMPTY type registry
return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY);
} catch (final Exception e) {
throw new KsqlStatementException(
"Failed to convert schema to KSQL model: " + e.getMessage(),
preparedStatement.getStatementText(),
e);
}
throwOnInvalidSchema(valueSchema);

final List<TableElement> elements = new ArrayList<>();

getKeyColumns(preparedStatement)
.forEach(elements::add);

getColumnsFromSchema(valueSchema)
.forEach(elements::add);

return TableElements.of(elements);
}

private static Stream<TableElement> getKeyColumns(
final ConfiguredStatement<CreateSource> preparedStatement
) {
return preparedStatement.getStatement().getElements().stream()
.filter(e -> e.getNamespace() == Namespace.KEY);
}

private static Stream<TableElement> getColumnsFromSchema(final Schema schema) {
return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY).stream();
}

private static void throwOnInvalidSchema(final Schema schema) {
SchemaConverters.connectToSqlConverter().toSqlType(schema);
try {
SchemaConverters.connectToSqlConverter().toSqlType(schema);
} catch (final Exception e) {
throw new KsqlException(
"Schema contains types not supported by KSQL: " + e.getMessage()
+ System.lineSeparator()
+ "Schema: " + schema,
e
);
}
}

private static PreparedStatement<CreateSource> buildPreparedStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import static io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaAndId.schemaAndId;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -381,7 +379,7 @@ public void shouldAddSchemaIdIfNotPresentAlready() {
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId().get(), is(SCHEMA_ID));
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(SCHEMA_ID)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID=5"));
}
Expand All @@ -395,8 +393,7 @@ public void shouldNotOverwriteExistingSchemaId() {
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId().get(),
is(42));
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(42)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID='42'"));
}
Expand All @@ -416,7 +413,7 @@ public void shouldThrowOnUnsupportedType() {
fail("Expected KsqlStatementException. schema: " + unsupportedSchema);
} catch (final KsqlStatementException e) {
assertThat(e.getRawMessage(),
containsString("Failed to convert schema to KSQL model:"));
containsString("Schema contains types not supported by KSQL:"));

assertThat(e.getSqlStatement(), is(csStatement.getStatementText()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public class PlannedTestGeneratorTest {
/**
* Run this test to generate new query plans for the {@link QueryTranslationTest} test cases.
*
* <p>Check the new query plans in with your change. Otherwise, {@link PlannedTestsUpToDateTest}
* fill fail if there are missing or changed query plans.
* <p>Ensure only the test plans you expected have changed, then check the new query plans in
* with your change. Otherwise, {@link PlannedTestsUpToDateTest} fill fail if there are missing
* or changed query plans.
*/
@Test
@Ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ROWKEY INTEGER KEY, C1 INTEGER) WITH (AVRO_SCHEMA_ID=1, KAFKA_TOPIC='input', VALUE_FORMAT='AvRo');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/2d/3pt97ylj3zngd51bwl91bl3r0000gp/T/confluent3048982612619405901",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"version" : "6.0.0",
"timestamp" : 1582711670275,
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<C1 INT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<C1 INT> NOT NULL"
},
"inputs" : [ {
"topic" : "input",
"key" : 42,
"value" : {
"c1" : 4
}
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : 42,
"value" : {
"C1" : 4
}
} ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Loading